vector/sinks/pulsar/
sink.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use pulsar::{Pulsar, TokioExecutor};
6use serde::Serialize;
7use snafu::Snafu;
8use vrl::value::KeyString;
9
10use super::{
11    config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder,
12    service::PulsarService, util,
13};
14use crate::sinks::prelude::*;
15
16#[derive(Debug, Snafu)]
17#[snafu(visibility(pub(crate)))]
18pub(crate) enum BuildError {
19    #[snafu(display("creating pulsar producer failed: {}", source))]
20    CreatePulsarSink {
21        source: Box<dyn std::error::Error + Send + Sync>,
22    },
23}
24
25pub(crate) struct PulsarSink {
26    transformer: Transformer,
27    encoder: Encoder<()>,
28    service: PulsarService<TokioExecutor>,
29    config: PulsarSinkConfig,
30    topic_template: Template,
31}
32
33/// Stores the event together with the extracted keys, topics, etc.
34/// This is passed into the `RequestBuilder` which then splits it out into the event
35/// and metadata containing the keys, and metadata.
36/// This event needs to be created prior to building the request so we can filter out
37/// any events that error whilst rendering the templates.
38#[derive(Serialize)]
39pub(super) struct PulsarEvent {
40    pub(super) event: Event,
41    pub(super) topic: String,
42    pub(super) key: Option<Bytes>,
43    pub(super) properties: Option<HashMap<KeyString, Bytes>>,
44    pub(super) timestamp_millis: Option<i64>,
45}
46
47impl EventCount for PulsarEvent {
48    fn event_count(&self) -> usize {
49        // A PulsarEvent represents one event.
50        1
51    }
52}
53
54impl ByteSizeOf for PulsarEvent {
55    fn allocated_bytes(&self) -> usize {
56        self.event.size_of()
57            + self.topic.size_of()
58            + self.key.as_ref().map_or(0, |bytes| bytes.size_of())
59            + self.properties.as_ref().map_or(0, |props| {
60                props
61                    .iter()
62                    .map(|(key, val)| key.allocated_bytes() + val.size_of())
63                    .sum()
64            })
65    }
66}
67
68impl EstimatedJsonEncodedSizeOf for PulsarEvent {
69    fn estimated_json_encoded_size_of(&self) -> JsonSize {
70        self.event.estimated_json_encoded_size_of()
71    }
72}
73
74pub(crate) async fn healthcheck(config: PulsarSinkConfig) -> crate::Result<()> {
75    let client = config.create_pulsar_client().await?;
76    let topic = config.topic.render_string(&LogEvent::from_str_legacy(""))?;
77    client.lookup_topic(topic).await?;
78    Ok(())
79}
80
81impl PulsarSink {
82    pub(crate) fn new(
83        client: Pulsar<TokioExecutor>,
84        config: PulsarSinkConfig,
85    ) -> crate::Result<Self> {
86        let producer_opts = config.build_producer_options();
87        let transformer = config.encoding.transformer();
88        let serializer = config.encoding.build()?;
89        let encoder = Encoder::<()>::new(serializer);
90        let service = PulsarService::new(client, producer_opts, config.producer_name.clone());
91        let topic_template = config.topic.clone();
92
93        Ok(PulsarSink {
94            config,
95            transformer,
96            encoder,
97            service,
98            topic_template,
99        })
100    }
101
102    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
103        let service = ServiceBuilder::new().service(self.service);
104        let request_builder = PulsarRequestBuilder {
105            encoder: PulsarEncoder {
106                transformer: self.transformer.clone(),
107                encoder: self.encoder.clone(),
108            },
109        };
110        let sink = input
111            .filter_map(|event| {
112                std::future::ready(util::make_pulsar_event(
113                    &self.topic_template,
114                    &self.config,
115                    event,
116                ))
117            })
118            .request_builder(default_request_builder_concurrency_limit(), request_builder)
119            .filter_map(|request| async move {
120                request
121                    .map_err(|e| error!("Failed to build Pulsar request: {:?}.", e))
122                    .ok()
123            })
124            .into_driver(service)
125            .protocol("tcp");
126
127        sink.run().await
128    }
129}
130
131#[async_trait]
132impl StreamSink<Event> for PulsarSink {
133    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
134        self.run_inner(input).await
135    }
136}