vector/sinks/pulsar/
sink.rs1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use pulsar::{Pulsar, TokioExecutor};
6use serde::Serialize;
7use snafu::Snafu;
8use vrl::value::KeyString;
9
10use super::{
11 config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder,
12 service::PulsarService, util,
13};
14use crate::sinks::prelude::*;
15
16#[derive(Debug, Snafu)]
17#[snafu(visibility(pub(crate)))]
18pub(crate) enum BuildError {
19 #[snafu(display("creating pulsar producer failed: {}", source))]
20 CreatePulsarSink {
21 source: Box<dyn std::error::Error + Send + Sync>,
22 },
23}
24
25pub(crate) struct PulsarSink {
26 transformer: Transformer,
27 encoder: Encoder<()>,
28 service: PulsarService<TokioExecutor>,
29 config: PulsarSinkConfig,
30 topic_template: Template,
31}
32
33#[derive(Serialize)]
39pub(super) struct PulsarEvent {
40 pub(super) event: Event,
41 pub(super) topic: String,
42 pub(super) key: Option<Bytes>,
43 pub(super) properties: Option<HashMap<KeyString, Bytes>>,
44 pub(super) timestamp_millis: Option<i64>,
45}
46
47impl EventCount for PulsarEvent {
48 fn event_count(&self) -> usize {
49 1
51 }
52}
53
54impl ByteSizeOf for PulsarEvent {
55 fn allocated_bytes(&self) -> usize {
56 self.event.size_of()
57 + self.topic.size_of()
58 + self.key.as_ref().map_or(0, |bytes| bytes.size_of())
59 + self.properties.as_ref().map_or(0, |props| {
60 props
61 .iter()
62 .map(|(key, val)| key.allocated_bytes() + val.size_of())
63 .sum()
64 })
65 }
66}
67
68impl EstimatedJsonEncodedSizeOf for PulsarEvent {
69 fn estimated_json_encoded_size_of(&self) -> JsonSize {
70 self.event.estimated_json_encoded_size_of()
71 }
72}
73
74pub(crate) async fn healthcheck(config: PulsarSinkConfig) -> crate::Result<()> {
75 let client = config.create_pulsar_client().await?;
76 let topic = config.topic.render_string(&LogEvent::from_str_legacy(""))?;
77 client.lookup_topic(topic).await?;
78 Ok(())
79}
80
81impl PulsarSink {
82 pub(crate) fn new(
83 client: Pulsar<TokioExecutor>,
84 config: PulsarSinkConfig,
85 ) -> crate::Result<Self> {
86 let producer_opts = config.build_producer_options();
87 let transformer = config.encoding.transformer();
88 let serializer = config.encoding.build()?;
89 let encoder = Encoder::<()>::new(serializer);
90 let service = PulsarService::new(client, producer_opts, config.producer_name.clone());
91 let topic_template = config.topic.clone();
92
93 Ok(PulsarSink {
94 config,
95 transformer,
96 encoder,
97 service,
98 topic_template,
99 })
100 }
101
102 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
103 let service = ServiceBuilder::new().service(self.service);
104 let request_builder = PulsarRequestBuilder {
105 encoder: PulsarEncoder {
106 transformer: self.transformer.clone(),
107 encoder: self.encoder.clone(),
108 },
109 };
110 let sink = input
111 .filter_map(|event| {
112 std::future::ready(util::make_pulsar_event(
113 &self.topic_template,
114 &self.config,
115 event,
116 ))
117 })
118 .request_builder(default_request_builder_concurrency_limit(), request_builder)
119 .filter_map(|request| async move {
120 request
121 .map_err(|e| error!("Failed to build Pulsar request: {:?}.", e))
122 .ok()
123 })
124 .into_driver(service)
125 .protocol("tcp");
126
127 sink.run().await
128 }
129}
130
131#[async_trait]
132impl StreamSink<Event> for PulsarSink {
133 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
134 self.run_inner(input).await
135 }
136}