vector/sinks/pulsar/
service.rs1use std::{
2 collections::HashMap,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use pulsar::{
9 Error as PulsarError, Executor, MultiTopicProducer, ProducerOptions, Pulsar, producer::Message,
10};
11use tokio::sync::Mutex;
12
13use crate::{
14 internal_events::PulsarSendingError,
15 sinks::{prelude::*, pulsar::request_builder::PulsarMetadata},
16};
17
18#[derive(Clone)]
19pub(super) struct PulsarRequest {
20 pub body: Bytes,
21 pub metadata: PulsarMetadata,
22 pub request_metadata: RequestMetadata,
23}
24
25pub struct PulsarResponse {
26 byte_size: usize,
27 event_byte_size: GroupedCountByteSize,
28}
29
30impl DriverResponse for PulsarResponse {
31 fn event_status(&self) -> EventStatus {
32 EventStatus::Delivered
33 }
34
35 fn events_sent(&self) -> &GroupedCountByteSize {
36 &self.event_byte_size
37 }
38
39 fn bytes_sent(&self) -> Option<usize> {
40 Some(self.byte_size)
41 }
42}
43
44impl Finalizable for PulsarRequest {
45 fn take_finalizers(&mut self) -> EventFinalizers {
46 std::mem::take(&mut self.metadata.finalizers)
47 }
48}
49
50impl MetaDescriptive for PulsarRequest {
51 fn get_metadata(&self) -> &RequestMetadata {
52 &self.request_metadata
53 }
54
55 fn metadata_mut(&mut self) -> &mut RequestMetadata {
56 &mut self.request_metadata
57 }
58}
59
60pub struct PulsarService<Exe: Executor> {
61 producer: Arc<Mutex<MultiTopicProducer<Exe>>>,
64}
65
66impl<Exe: Executor> PulsarService<Exe> {
67 pub(crate) fn new(
68 pulsar_client: Pulsar<Exe>,
69 producer_options: ProducerOptions,
70 producer_name: Option<String>,
71 ) -> PulsarService<Exe> {
72 let mut builder = pulsar_client.producer().with_options(producer_options);
73
74 if let Some(name) = producer_name {
75 builder = builder.with_name(name);
76 }
77
78 let producer = builder.build_multi_topic();
79
80 PulsarService {
81 producer: Arc::new(Mutex::new(producer)),
82 }
83 }
84}
85
86impl<Exe: Executor> Service<PulsarRequest> for PulsarService<Exe> {
87 type Response = PulsarResponse;
88 type Error = PulsarError;
89 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
90
91 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
92 match self.producer.try_lock() {
93 Ok(_) => Poll::Ready(Ok(())),
94 Err(_) => Poll::Pending,
95 }
96 }
97
98 fn call(&mut self, request: PulsarRequest) -> Self::Future {
99 let producer = Arc::clone(&self.producer);
100 let topic = request.metadata.topic.clone();
101 let event_time = request
102 .metadata
103 .timestamp_millis
104 .to_owned()
105 .map(|t| t as u64);
106
107 Box::pin(async move {
108 let body = request.body.clone();
109 let byte_size = request.body.len();
110
111 let mut properties = HashMap::new();
112 if let Some(props) = request.metadata.properties {
113 for (key, value) in props {
114 properties.insert(key.into(), String::from_utf8_lossy(&value).to_string());
115 }
116 }
117
118 let partition_key = request
119 .metadata
120 .key
121 .map(|key| String::from_utf8_lossy(&key).to_string());
122
123 let message = Message {
124 payload: body.as_ref().to_vec(),
125 properties,
126 partition_key,
127 event_time,
128 ..Default::default()
129 };
130
131 let fut = producer
138 .lock()
139 .await
140 .send_non_blocking(topic, message)
141 .await;
142
143 match fut {
144 Ok(resp) => match resp.await {
145 Ok(_) => Ok(PulsarResponse {
146 byte_size,
147 event_byte_size: request
148 .request_metadata
149 .into_events_estimated_json_encoded_byte_size(),
150 }),
151 Err(e) => {
152 emit!(PulsarSendingError {
153 error: Box::new(PulsarError::Custom("failed to send".to_string())),
154 count: 1
155 });
156 Err(e)
157 }
158 },
159 Err(e) => {
160 emit!(PulsarSendingError {
161 error: Box::new(PulsarError::Custom("failed to send".to_string())),
162 count: 1,
163 });
164 Err(e)
165 }
166 }
167 })
168 }
169}