vector/sinks/pulsar/
request_builder.rs1use std::{collections::HashMap, io};
2
3use bytes::Bytes;
4
5use crate::{
6 event::KeyString,
7 sinks::{
8 prelude::*,
9 pulsar::{encoder::PulsarEncoder, service::PulsarRequest, sink::PulsarEvent},
10 },
11};
12
13#[derive(Clone)]
14pub(super) struct PulsarMetadata {
15 pub finalizers: EventFinalizers,
16 pub key: Option<Bytes>,
17 pub properties: Option<HashMap<KeyString, Bytes>>,
18 pub timestamp_millis: Option<i64>,
19 pub topic: String,
20}
21
22pub(super) struct PulsarRequestBuilder {
23 pub(super) encoder: PulsarEncoder,
24}
25
26impl RequestBuilder<PulsarEvent> for PulsarRequestBuilder {
27 type Metadata = PulsarMetadata;
28 type Events = Event;
29 type Encoder = PulsarEncoder;
30 type Payload = Bytes;
31 type Request = PulsarRequest;
32 type Error = io::Error;
33
34 fn compression(&self) -> Compression {
35 Compression::None
37 }
38
39 fn encoder(&self) -> &Self::Encoder {
40 &self.encoder
41 }
42
43 fn split_input(
44 &self,
45 mut input: PulsarEvent,
46 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
47 let builder = RequestMetadataBuilder::from_event(&input.event);
48 let metadata = PulsarMetadata {
49 finalizers: input.event.take_finalizers(),
50 key: input.key,
51 timestamp_millis: input.timestamp_millis,
52 properties: input.properties,
53 topic: input.topic,
54 };
55 (metadata, builder, input.event)
56 }
57
58 fn build_request(
59 &self,
60 metadata: Self::Metadata,
61 request_metadata: RequestMetadata,
62 payload: EncodeResult<Self::Payload>,
63 ) -> Self::Request {
64 let body = payload.into_payload();
65 PulsarRequest {
66 body,
67 metadata,
68 request_metadata,
69 }
70 }
71}