vector/sinks/pulsar/
service.rs

1use std::{
2    collections::HashMap,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use pulsar::{
9    Error as PulsarError, Executor, MultiTopicProducer, ProducerOptions, Pulsar, producer::Message,
10};
11use tokio::sync::Mutex;
12
13use crate::{
14    internal_events::PulsarSendingError,
15    sinks::{prelude::*, pulsar::request_builder::PulsarMetadata},
16};
17
18#[derive(Clone)]
19pub(super) struct PulsarRequest {
20    pub body: Bytes,
21    pub metadata: PulsarMetadata,
22    pub request_metadata: RequestMetadata,
23}
24
25pub struct PulsarResponse {
26    byte_size: usize,
27    event_byte_size: GroupedCountByteSize,
28}
29
30impl DriverResponse for PulsarResponse {
31    fn event_status(&self) -> EventStatus {
32        EventStatus::Delivered
33    }
34
35    fn events_sent(&self) -> &GroupedCountByteSize {
36        &self.event_byte_size
37    }
38
39    fn bytes_sent(&self) -> Option<usize> {
40        Some(self.byte_size)
41    }
42}
43
44impl Finalizable for PulsarRequest {
45    fn take_finalizers(&mut self) -> EventFinalizers {
46        std::mem::take(&mut self.metadata.finalizers)
47    }
48}
49
50impl MetaDescriptive for PulsarRequest {
51    fn get_metadata(&self) -> &RequestMetadata {
52        &self.request_metadata
53    }
54
55    fn metadata_mut(&mut self) -> &mut RequestMetadata {
56        &mut self.request_metadata
57    }
58}
59
60pub struct PulsarService<Exe: Executor> {
61    // NOTE: the reason for the Mutex here is because the `Producer` from the pulsar crate
62    // needs to be `mut`, and the `Service::call()` returns a Future.
63    producer: Arc<Mutex<MultiTopicProducer<Exe>>>,
64}
65
66impl<Exe: Executor> PulsarService<Exe> {
67    pub(crate) fn new(
68        pulsar_client: Pulsar<Exe>,
69        producer_options: ProducerOptions,
70        producer_name: Option<String>,
71    ) -> PulsarService<Exe> {
72        let mut builder = pulsar_client.producer().with_options(producer_options);
73
74        if let Some(name) = producer_name {
75            builder = builder.with_name(name);
76        }
77
78        let producer = builder.build_multi_topic();
79
80        PulsarService {
81            producer: Arc::new(Mutex::new(producer)),
82        }
83    }
84}
85
86impl<Exe: Executor> Service<PulsarRequest> for PulsarService<Exe> {
87    type Response = PulsarResponse;
88    type Error = PulsarError;
89    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
90
91    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
92        match self.producer.try_lock() {
93            Ok(_) => Poll::Ready(Ok(())),
94            Err(_) => Poll::Pending,
95        }
96    }
97
98    fn call(&mut self, request: PulsarRequest) -> Self::Future {
99        let producer = Arc::clone(&self.producer);
100        let topic = request.metadata.topic.clone();
101        let event_time = request
102            .metadata
103            .timestamp_millis
104            .to_owned()
105            .map(|t| t as u64);
106
107        Box::pin(async move {
108            let body = request.body.clone();
109            let byte_size = request.body.len();
110
111            let mut properties = HashMap::new();
112            if let Some(props) = request.metadata.properties {
113                for (key, value) in props {
114                    properties.insert(key.into(), String::from_utf8_lossy(&value).to_string());
115                }
116            }
117
118            let partition_key = request
119                .metadata
120                .key
121                .map(|key| String::from_utf8_lossy(&key).to_string());
122
123            let message = Message {
124                payload: body.as_ref().to_vec(),
125                properties,
126                partition_key,
127                event_time,
128                ..Default::default()
129            };
130
131            // The locking if this mutex is not normal in `Service::call()` implementations, but we
132            // at least can limit the scope of the lock by placing it here, and reduce the
133            // possibility of performance impact by checking the `try_lock()` result in
134            // `poll_ready()`. This sink is already limited to sequential request handling due to
135            // the pulsar API, so this shouldn't impact performance from a concurrent requests
136            // standpoint.
137            let fut = producer
138                .lock()
139                .await
140                .send_non_blocking(topic, message)
141                .await;
142
143            match fut {
144                Ok(resp) => match resp.await {
145                    Ok(_) => Ok(PulsarResponse {
146                        byte_size,
147                        event_byte_size: request
148                            .request_metadata
149                            .into_events_estimated_json_encoded_byte_size(),
150                    }),
151                    Err(e) => {
152                        emit!(PulsarSendingError {
153                            error: Box::new(PulsarError::Custom("failed to send".to_string())),
154                            count: 1
155                        });
156                        Err(e)
157                    }
158                },
159                Err(e) => {
160                    emit!(PulsarSendingError {
161                        error: Box::new(PulsarError::Custom("failed to send".to_string())),
162                        count: 1,
163                    });
164                    Err(e)
165                }
166            }
167        })
168    }
169}