vector/sinks/pulsar/
sink.rs1use async_trait::async_trait;
2use bytes::Bytes;
3use pulsar::{Pulsar, TokioExecutor};
4use serde::Serialize;
5use snafu::Snafu;
6use std::collections::HashMap;
7use vrl::value::KeyString;
8
9use super::{
10 config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder,
11 service::PulsarService, util,
12};
13use crate::sinks::prelude::*;
14
15#[derive(Debug, Snafu)]
16#[snafu(visibility(pub(crate)))]
17pub(crate) enum BuildError {
18 #[snafu(display("creating pulsar producer failed: {}", source))]
19 CreatePulsarSink {
20 source: Box<dyn std::error::Error + Send + Sync>,
21 },
22}
23
24pub(crate) struct PulsarSink {
25 transformer: Transformer,
26 encoder: Encoder<()>,
27 service: PulsarService<TokioExecutor>,
28 config: PulsarSinkConfig,
29 topic_template: Template,
30}
31
32#[derive(Serialize)]
38pub(super) struct PulsarEvent {
39 pub(super) event: Event,
40 pub(super) topic: String,
41 pub(super) key: Option<Bytes>,
42 pub(super) properties: Option<HashMap<KeyString, Bytes>>,
43 pub(super) timestamp_millis: Option<i64>,
44}
45
46impl EventCount for PulsarEvent {
47 fn event_count(&self) -> usize {
48 1
50 }
51}
52
53impl ByteSizeOf for PulsarEvent {
54 fn allocated_bytes(&self) -> usize {
55 self.event.size_of()
56 + self.topic.size_of()
57 + self.key.as_ref().map_or(0, |bytes| bytes.size_of())
58 + self.properties.as_ref().map_or(0, |props| {
59 props
60 .iter()
61 .map(|(key, val)| key.allocated_bytes() + val.size_of())
62 .sum()
63 })
64 }
65}
66
67impl EstimatedJsonEncodedSizeOf for PulsarEvent {
68 fn estimated_json_encoded_size_of(&self) -> JsonSize {
69 self.event.estimated_json_encoded_size_of()
70 }
71}
72
73pub(crate) async fn healthcheck(config: PulsarSinkConfig) -> crate::Result<()> {
74 let client = config.create_pulsar_client().await?;
75 let topic = config.topic.render_string(&LogEvent::from_str_legacy(""))?;
76 client.lookup_topic(topic).await?;
77 Ok(())
78}
79
80impl PulsarSink {
81 pub(crate) fn new(
82 client: Pulsar<TokioExecutor>,
83 config: PulsarSinkConfig,
84 ) -> crate::Result<Self> {
85 let producer_opts = config.build_producer_options();
86 let transformer = config.encoding.transformer();
87 let serializer = config.encoding.build()?;
88 let encoder = Encoder::<()>::new(serializer);
89 let service = PulsarService::new(client, producer_opts, config.producer_name.clone());
90 let topic_template = config.topic.clone();
91
92 Ok(PulsarSink {
93 config,
94 transformer,
95 encoder,
96 service,
97 topic_template,
98 })
99 }
100
101 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
102 let service = ServiceBuilder::new().service(self.service);
103 let request_builder = PulsarRequestBuilder {
104 encoder: PulsarEncoder {
105 transformer: self.transformer.clone(),
106 encoder: self.encoder.clone(),
107 },
108 };
109 let sink = input
110 .filter_map(|event| {
111 std::future::ready(util::make_pulsar_event(
112 &self.topic_template,
113 &self.config,
114 event,
115 ))
116 })
117 .request_builder(default_request_builder_concurrency_limit(), request_builder)
118 .filter_map(|request| async move {
119 request
120 .map_err(|e| error!("Failed to build Pulsar request: {:?}.", e))
121 .ok()
122 })
123 .into_driver(service)
124 .protocol("tcp");
125
126 sink.run().await
127 }
128}
129
130#[async_trait]
131impl StreamSink<Event> for PulsarSink {
132 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
133 self.run_inner(input).await
134 }
135}