vector/sinks/pulsar/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use bytes::Bytes;
6use pulsar::producer::Message;
7use pulsar::{Error as PulsarError, Executor, MultiTopicProducer, ProducerOptions, Pulsar};
8use tokio::sync::Mutex;
9
10use crate::internal_events::PulsarSendingError;
11use crate::sinks::{prelude::*, pulsar::request_builder::PulsarMetadata};
12
13#[derive(Clone)]
14pub(super) struct PulsarRequest {
15    pub body: Bytes,
16    pub metadata: PulsarMetadata,
17    pub request_metadata: RequestMetadata,
18}
19
20pub struct PulsarResponse {
21    byte_size: usize,
22    event_byte_size: GroupedCountByteSize,
23}
24
25impl DriverResponse for PulsarResponse {
26    fn event_status(&self) -> EventStatus {
27        EventStatus::Delivered
28    }
29
30    fn events_sent(&self) -> &GroupedCountByteSize {
31        &self.event_byte_size
32    }
33
34    fn bytes_sent(&self) -> Option<usize> {
35        Some(self.byte_size)
36    }
37}
38
39impl Finalizable for PulsarRequest {
40    fn take_finalizers(&mut self) -> EventFinalizers {
41        std::mem::take(&mut self.metadata.finalizers)
42    }
43}
44
45impl MetaDescriptive for PulsarRequest {
46    fn get_metadata(&self) -> &RequestMetadata {
47        &self.request_metadata
48    }
49
50    fn metadata_mut(&mut self) -> &mut RequestMetadata {
51        &mut self.request_metadata
52    }
53}
54
55pub struct PulsarService<Exe: Executor> {
56    // NOTE: the reason for the Mutex here is because the `Producer` from the pulsar crate
57    // needs to be `mut`, and the `Service::call()` returns a Future.
58    producer: Arc<Mutex<MultiTopicProducer<Exe>>>,
59}
60
61impl<Exe: Executor> PulsarService<Exe> {
62    pub(crate) fn new(
63        pulsar_client: Pulsar<Exe>,
64        producer_options: ProducerOptions,
65        producer_name: Option<String>,
66    ) -> PulsarService<Exe> {
67        let mut builder = pulsar_client.producer().with_options(producer_options);
68
69        if let Some(name) = producer_name {
70            builder = builder.with_name(name);
71        }
72
73        let producer = builder.build_multi_topic();
74
75        PulsarService {
76            producer: Arc::new(Mutex::new(producer)),
77        }
78    }
79}
80
81impl<Exe: Executor> Service<PulsarRequest> for PulsarService<Exe> {
82    type Response = PulsarResponse;
83    type Error = PulsarError;
84    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
85
86    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        match self.producer.try_lock() {
88            Ok(_) => Poll::Ready(Ok(())),
89            Err(_) => Poll::Pending,
90        }
91    }
92
93    fn call(&mut self, request: PulsarRequest) -> Self::Future {
94        let producer = Arc::clone(&self.producer);
95        let topic = request.metadata.topic.clone();
96        let event_time = request
97            .metadata
98            .timestamp_millis
99            .to_owned()
100            .map(|t| t as u64);
101
102        Box::pin(async move {
103            let body = request.body.clone();
104            let byte_size = request.body.len();
105
106            let mut properties = HashMap::new();
107            if let Some(props) = request.metadata.properties {
108                for (key, value) in props {
109                    properties.insert(key.into(), String::from_utf8_lossy(&value).to_string());
110                }
111            }
112
113            let partition_key = request
114                .metadata
115                .key
116                .map(|key| String::from_utf8_lossy(&key).to_string());
117
118            let message = Message {
119                payload: body.as_ref().to_vec(),
120                properties,
121                partition_key,
122                event_time,
123                ..Default::default()
124            };
125
126            // The locking if this mutex is not normal in `Service::call()` implementations, but we
127            // at least can limit the scope of the lock by placing it here, and reduce the
128            // possibility of performance impact by checking the `try_lock()` result in
129            // `poll_ready()`. This sink is already limited to sequential request handling due to
130            // the pulsar API, so this shouldn't impact performance from a concurrent requests
131            // standpoint.
132            let fut = producer
133                .lock()
134                .await
135                .send_non_blocking(topic, message)
136                .await;
137
138            match fut {
139                Ok(resp) => match resp.await {
140                    Ok(_) => Ok(PulsarResponse {
141                        byte_size,
142                        event_byte_size: request
143                            .request_metadata
144                            .into_events_estimated_json_encoded_byte_size(),
145                    }),
146                    Err(e) => {
147                        emit!(PulsarSendingError {
148                            error: Box::new(PulsarError::Custom("failed to send".to_string())),
149                            count: 1
150                        });
151                        Err(e)
152                    }
153                },
154                Err(e) => {
155                    emit!(PulsarSendingError {
156                        error: Box::new(PulsarError::Custom("failed to send".to_string())),
157                        count: 1,
158                    });
159                    Err(e)
160                }
161            }
162        })
163    }
164}