vector/sinks/pulsar/
request_builder.rs

1use std::{collections::HashMap, io};
2
3use bytes::Bytes;
4
5use crate::{
6    event::KeyString,
7    sinks::{
8        prelude::*,
9        pulsar::{encoder::PulsarEncoder, service::PulsarRequest, sink::PulsarEvent},
10    },
11};
12
13#[derive(Clone)]
14pub(super) struct PulsarMetadata {
15    pub finalizers: EventFinalizers,
16    pub key: Option<Bytes>,
17    pub properties: Option<HashMap<KeyString, Bytes>>,
18    pub timestamp_millis: Option<i64>,
19    pub topic: String,
20}
21
22pub(super) struct PulsarRequestBuilder {
23    pub(super) encoder: PulsarEncoder,
24}
25
26impl RequestBuilder<PulsarEvent> for PulsarRequestBuilder {
27    type Metadata = PulsarMetadata;
28    type Events = Event;
29    type Encoder = PulsarEncoder;
30    type Payload = Bytes;
31    type Request = PulsarRequest;
32    type Error = io::Error;
33
34    fn compression(&self) -> Compression {
35        // Compression is handled by the pulsar crate through the producer settings.
36        Compression::None
37    }
38
39    fn encoder(&self) -> &Self::Encoder {
40        &self.encoder
41    }
42
43    fn split_input(
44        &self,
45        mut input: PulsarEvent,
46    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
47        let builder = RequestMetadataBuilder::from_event(&input.event);
48        let metadata = PulsarMetadata {
49            finalizers: input.event.take_finalizers(),
50            key: input.key,
51            timestamp_millis: input.timestamp_millis,
52            properties: input.properties,
53            topic: input.topic,
54        };
55        (metadata, builder, input.event)
56    }
57
58    fn build_request(
59        &self,
60        metadata: Self::Metadata,
61        request_metadata: RequestMetadata,
62        payload: EncodeResult<Self::Payload>,
63    ) -> Self::Request {
64        let body = payload.into_payload();
65        PulsarRequest {
66            body,
67            metadata,
68            request_metadata,
69        }
70    }
71}