vector/sinks/pulsar/
sink.rs

1use async_trait::async_trait;
2use bytes::Bytes;
3use pulsar::{Pulsar, TokioExecutor};
4use serde::Serialize;
5use snafu::Snafu;
6use std::collections::HashMap;
7use vrl::value::KeyString;
8
9use super::{
10    config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder,
11    service::PulsarService, util,
12};
13use crate::sinks::prelude::*;
14
15#[derive(Debug, Snafu)]
16#[snafu(visibility(pub(crate)))]
17pub(crate) enum BuildError {
18    #[snafu(display("creating pulsar producer failed: {}", source))]
19    CreatePulsarSink {
20        source: Box<dyn std::error::Error + Send + Sync>,
21    },
22}
23
24pub(crate) struct PulsarSink {
25    transformer: Transformer,
26    encoder: Encoder<()>,
27    service: PulsarService<TokioExecutor>,
28    config: PulsarSinkConfig,
29    topic_template: Template,
30}
31
32/// Stores the event together with the extracted keys, topics, etc.
33/// This is passed into the `RequestBuilder` which then splits it out into the event
34/// and metadata containing the keys, and metadata.
35/// This event needs to be created prior to building the request so we can filter out
36/// any events that error whilst rendering the templates.
37#[derive(Serialize)]
38pub(super) struct PulsarEvent {
39    pub(super) event: Event,
40    pub(super) topic: String,
41    pub(super) key: Option<Bytes>,
42    pub(super) properties: Option<HashMap<KeyString, Bytes>>,
43    pub(super) timestamp_millis: Option<i64>,
44}
45
46impl EventCount for PulsarEvent {
47    fn event_count(&self) -> usize {
48        // A PulsarEvent represents one event.
49        1
50    }
51}
52
53impl ByteSizeOf for PulsarEvent {
54    fn allocated_bytes(&self) -> usize {
55        self.event.size_of()
56            + self.topic.size_of()
57            + self.key.as_ref().map_or(0, |bytes| bytes.size_of())
58            + self.properties.as_ref().map_or(0, |props| {
59                props
60                    .iter()
61                    .map(|(key, val)| key.allocated_bytes() + val.size_of())
62                    .sum()
63            })
64    }
65}
66
67impl EstimatedJsonEncodedSizeOf for PulsarEvent {
68    fn estimated_json_encoded_size_of(&self) -> JsonSize {
69        self.event.estimated_json_encoded_size_of()
70    }
71}
72
73pub(crate) async fn healthcheck(config: PulsarSinkConfig) -> crate::Result<()> {
74    let client = config.create_pulsar_client().await?;
75    let topic = config.topic.render_string(&LogEvent::from_str_legacy(""))?;
76    client.lookup_topic(topic).await?;
77    Ok(())
78}
79
80impl PulsarSink {
81    pub(crate) fn new(
82        client: Pulsar<TokioExecutor>,
83        config: PulsarSinkConfig,
84    ) -> crate::Result<Self> {
85        let producer_opts = config.build_producer_options();
86        let transformer = config.encoding.transformer();
87        let serializer = config.encoding.build()?;
88        let encoder = Encoder::<()>::new(serializer);
89        let service = PulsarService::new(client, producer_opts, config.producer_name.clone());
90        let topic_template = config.topic.clone();
91
92        Ok(PulsarSink {
93            config,
94            transformer,
95            encoder,
96            service,
97            topic_template,
98        })
99    }
100
101    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
102        let service = ServiceBuilder::new().service(self.service);
103        let request_builder = PulsarRequestBuilder {
104            encoder: PulsarEncoder {
105                transformer: self.transformer.clone(),
106                encoder: self.encoder.clone(),
107            },
108        };
109        let sink = input
110            .filter_map(|event| {
111                std::future::ready(util::make_pulsar_event(
112                    &self.topic_template,
113                    &self.config,
114                    event,
115                ))
116            })
117            .request_builder(default_request_builder_concurrency_limit(), request_builder)
118            .filter_map(|request| async move {
119                request
120                    .map_err(|e| error!("Failed to build Pulsar request: {:?}.", e))
121                    .ok()
122            })
123            .into_driver(service)
124            .protocol("tcp");
125
126        sink.run().await
127    }
128}
129
130#[async_trait]
131impl StreamSink<Event> for PulsarSink {
132    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
133        self.run_inner(input).await
134    }
135}