vector/sinks/pulsar/
service.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use bytes::Bytes;
6use pulsar::producer::Message;
7use pulsar::{Error as PulsarError, Executor, MultiTopicProducer, ProducerOptions, Pulsar};
8use tokio::sync::Mutex;
9
10use crate::internal_events::PulsarSendingError;
11use crate::sinks::{prelude::*, pulsar::request_builder::PulsarMetadata};
12
13#[derive(Clone)]
14pub(super) struct PulsarRequest {
15 pub body: Bytes,
16 pub metadata: PulsarMetadata,
17 pub request_metadata: RequestMetadata,
18}
19
20pub struct PulsarResponse {
21 byte_size: usize,
22 event_byte_size: GroupedCountByteSize,
23}
24
25impl DriverResponse for PulsarResponse {
26 fn event_status(&self) -> EventStatus {
27 EventStatus::Delivered
28 }
29
30 fn events_sent(&self) -> &GroupedCountByteSize {
31 &self.event_byte_size
32 }
33
34 fn bytes_sent(&self) -> Option<usize> {
35 Some(self.byte_size)
36 }
37}
38
39impl Finalizable for PulsarRequest {
40 fn take_finalizers(&mut self) -> EventFinalizers {
41 std::mem::take(&mut self.metadata.finalizers)
42 }
43}
44
45impl MetaDescriptive for PulsarRequest {
46 fn get_metadata(&self) -> &RequestMetadata {
47 &self.request_metadata
48 }
49
50 fn metadata_mut(&mut self) -> &mut RequestMetadata {
51 &mut self.request_metadata
52 }
53}
54
55pub struct PulsarService<Exe: Executor> {
56 producer: Arc<Mutex<MultiTopicProducer<Exe>>>,
59}
60
61impl<Exe: Executor> PulsarService<Exe> {
62 pub(crate) fn new(
63 pulsar_client: Pulsar<Exe>,
64 producer_options: ProducerOptions,
65 producer_name: Option<String>,
66 ) -> PulsarService<Exe> {
67 let mut builder = pulsar_client.producer().with_options(producer_options);
68
69 if let Some(name) = producer_name {
70 builder = builder.with_name(name);
71 }
72
73 let producer = builder.build_multi_topic();
74
75 PulsarService {
76 producer: Arc::new(Mutex::new(producer)),
77 }
78 }
79}
80
81impl<Exe: Executor> Service<PulsarRequest> for PulsarService<Exe> {
82 type Response = PulsarResponse;
83 type Error = PulsarError;
84 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
85
86 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 match self.producer.try_lock() {
88 Ok(_) => Poll::Ready(Ok(())),
89 Err(_) => Poll::Pending,
90 }
91 }
92
93 fn call(&mut self, request: PulsarRequest) -> Self::Future {
94 let producer = Arc::clone(&self.producer);
95 let topic = request.metadata.topic.clone();
96 let event_time = request
97 .metadata
98 .timestamp_millis
99 .to_owned()
100 .map(|t| t as u64);
101
102 Box::pin(async move {
103 let body = request.body.clone();
104 let byte_size = request.body.len();
105
106 let mut properties = HashMap::new();
107 if let Some(props) = request.metadata.properties {
108 for (key, value) in props {
109 properties.insert(key.into(), String::from_utf8_lossy(&value).to_string());
110 }
111 }
112
113 let partition_key = request
114 .metadata
115 .key
116 .map(|key| String::from_utf8_lossy(&key).to_string());
117
118 let message = Message {
119 payload: body.as_ref().to_vec(),
120 properties,
121 partition_key,
122 event_time,
123 ..Default::default()
124 };
125
126 let fut = producer
133 .lock()
134 .await
135 .send_non_blocking(topic, message)
136 .await;
137
138 match fut {
139 Ok(resp) => match resp.await {
140 Ok(_) => Ok(PulsarResponse {
141 byte_size,
142 event_byte_size: request
143 .request_metadata
144 .into_events_estimated_json_encoded_byte_size(),
145 }),
146 Err(e) => {
147 emit!(PulsarSendingError {
148 error: Box::new(PulsarError::Custom("failed to send".to_string())),
149 count: 1
150 });
151 Err(e)
152 }
153 },
154 Err(e) => {
155 emit!(PulsarSendingError {
156 error: Box::new(PulsarError::Custom("failed to send".to_string())),
157 count: 1,
158 });
159 Err(e)
160 }
161 }
162 })
163 }
164}