vector/sinks/pulsar/
request_builder.rs

1use bytes::Bytes;
2use std::collections::HashMap;
3use std::io;
4
5use crate::event::KeyString;
6use crate::sinks::{
7    prelude::*,
8    pulsar::{encoder::PulsarEncoder, service::PulsarRequest, sink::PulsarEvent},
9};
10
11#[derive(Clone)]
12pub(super) struct PulsarMetadata {
13    pub finalizers: EventFinalizers,
14    pub key: Option<Bytes>,
15    pub properties: Option<HashMap<KeyString, Bytes>>,
16    pub timestamp_millis: Option<i64>,
17    pub topic: String,
18}
19
20pub(super) struct PulsarRequestBuilder {
21    pub(super) encoder: PulsarEncoder,
22}
23
24impl RequestBuilder<PulsarEvent> for PulsarRequestBuilder {
25    type Metadata = PulsarMetadata;
26    type Events = Event;
27    type Encoder = PulsarEncoder;
28    type Payload = Bytes;
29    type Request = PulsarRequest;
30    type Error = io::Error;
31
32    fn compression(&self) -> Compression {
33        // Compression is handled by the pulsar crate through the producer settings.
34        Compression::None
35    }
36
37    fn encoder(&self) -> &Self::Encoder {
38        &self.encoder
39    }
40
41    fn split_input(
42        &self,
43        mut input: PulsarEvent,
44    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
45        let builder = RequestMetadataBuilder::from_event(&input.event);
46        let metadata = PulsarMetadata {
47            finalizers: input.event.take_finalizers(),
48            key: input.key,
49            timestamp_millis: input.timestamp_millis,
50            properties: input.properties,
51            topic: input.topic,
52        };
53        (metadata, builder, input.event)
54    }
55
56    fn build_request(
57        &self,
58        metadata: Self::Metadata,
59        request_metadata: RequestMetadata,
60        payload: EncodeResult<Self::Payload>,
61    ) -> Self::Request {
62        let body = payload.into_payload();
63        PulsarRequest {
64            body,
65            metadata,
66            request_metadata,
67        }
68    }
69}