vector/sinks/pulsar/
request_builder.rs1use bytes::Bytes;
2use std::collections::HashMap;
3use std::io;
4
5use crate::event::KeyString;
6use crate::sinks::{
7 prelude::*,
8 pulsar::{encoder::PulsarEncoder, service::PulsarRequest, sink::PulsarEvent},
9};
10
11#[derive(Clone)]
12pub(super) struct PulsarMetadata {
13 pub finalizers: EventFinalizers,
14 pub key: Option<Bytes>,
15 pub properties: Option<HashMap<KeyString, Bytes>>,
16 pub timestamp_millis: Option<i64>,
17 pub topic: String,
18}
19
20pub(super) struct PulsarRequestBuilder {
21 pub(super) encoder: PulsarEncoder,
22}
23
24impl RequestBuilder<PulsarEvent> for PulsarRequestBuilder {
25 type Metadata = PulsarMetadata;
26 type Events = Event;
27 type Encoder = PulsarEncoder;
28 type Payload = Bytes;
29 type Request = PulsarRequest;
30 type Error = io::Error;
31
32 fn compression(&self) -> Compression {
33 Compression::None
35 }
36
37 fn encoder(&self) -> &Self::Encoder {
38 &self.encoder
39 }
40
41 fn split_input(
42 &self,
43 mut input: PulsarEvent,
44 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
45 let builder = RequestMetadataBuilder::from_event(&input.event);
46 let metadata = PulsarMetadata {
47 finalizers: input.event.take_finalizers(),
48 key: input.key,
49 timestamp_millis: input.timestamp_millis,
50 properties: input.properties,
51 topic: input.topic,
52 };
53 (metadata, builder, input.event)
54 }
55
56 fn build_request(
57 &self,
58 metadata: Self::Metadata,
59 request_metadata: RequestMetadata,
60 payload: EncodeResult<Self::Payload>,
61 ) -> Self::Request {
62 let body = payload.into_payload();
63 PulsarRequest {
64 body,
65 metadata,
66 request_metadata,
67 }
68 }
69}