service.rsuse std::collections::HashMap;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use pulsar::producer::Message;
use pulsar::{Error as PulsarError, Executor, MultiTopicProducer, ProducerOptions, Pulsar};
use tokio::sync::Mutex;
use crate::internal_events::PulsarSendingError;
use crate::sinks::{prelude::*, pulsar::request_builder::PulsarMetadata};
pub(super) struct PulsarRequest {
pub body: Bytes,
pub metadata: PulsarMetadata,
pub request_metadata: RequestMetadata,
pub struct PulsarResponse {
byte_size: usize,
event_byte_size: GroupedCountByteSize,
impl DriverResponse for PulsarResponse {
fn event_status(&self) -> EventStatus {
fn events_sent(&self) -> &GroupedCountByteSize {
fn bytes_sent(&self) -> Option<usize> {
impl Finalizable for PulsarRequest {
fn take_finalizers(&mut self) -> EventFinalizers {
std::mem::take(&mut self.metadata.finalizers)
impl MetaDescriptive for PulsarRequest {
fn get_metadata(&self) -> &RequestMetadata {
fn metadata_mut(&mut self) -> &mut RequestMetadata {
&mut self.request_metadata
pub struct PulsarService<Exe: Executor> {
producer: Arc<Mutex<MultiTopicProducer<Exe>>>,
impl<Exe: Executor> PulsarService<Exe> {
pub(crate) fn new(
pulsar_client: Pulsar<Exe>,
producer_options: ProducerOptions,
producer_name: Option<String>,
) -> PulsarService<Exe> {
let mut builder = pulsar_client.producer().with_options(producer_options);
if let Some(name) = producer_name {
builder = builder.with_name(name);
let producer = builder.build_multi_topic();
PulsarService {
producer: Arc::new(Mutex::new(producer)),
impl<Exe: Executor> Service<PulsarRequest> for PulsarService<Exe> {
type Response = PulsarResponse;
type Error = PulsarError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.producer.try_lock() {
Ok(_) => Poll::Ready(Ok(())),
Err(_) => Poll::Pending,
fn call(&mut self, request: PulsarRequest) -> Self::Future {
let producer = Arc::clone(&self.producer);
let topic = request.metadata.topic.clone();
let event_time = request
.map(|t| t as u64);
Box::pin(async move {
let body = request.body.clone();
let byte_size = request.body.len();
let mut properties = HashMap::new();
if let Some(props) = {
for (key, value) in props {
properties.insert(key.into(), String::from_utf8_lossy(&value).to_string());
let partition_key = request
.map(|key| String::from_utf8_lossy(&key).to_string());
let message = Message {
payload: body.as_ref().to_vec(),
let fut = producer
.send_non_blocking(topic, message)
match fut {
Ok(resp) => match resp.await {
Ok(_) => Ok(PulsarResponse {
event_byte_size: request
Err(e) => {
emit!(PulsarSendingError {
error: Box::new(PulsarError::Custom("failed to send".to_string())),
count: 1
Err(e) => {
emit!(PulsarSendingError {
error: Box::new(PulsarError::Custom("failed to send".to_string())),
count: 1,