1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use async_trait::async_trait;
use bytes::Bytes;
use pulsar::{Error as PulsarError, Pulsar, TokioExecutor};
use serde::Serialize;
use snafu::Snafu;
use std::collections::HashMap;
use vrl::value::KeyString;

use super::{
    config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder,
    service::PulsarService, util,
};
use crate::sinks::prelude::*;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub(crate) enum BuildError {
    #[snafu(display("creating pulsar producer failed: {}", source))]
    CreatePulsarSink { source: PulsarError },
}

pub(crate) struct PulsarSink {
    transformer: Transformer,
    encoder: Encoder<()>,
    service: PulsarService<TokioExecutor>,
    config: PulsarSinkConfig,
    topic_template: Template,
}

/// Stores the event together with the extracted keys, topics, etc.
/// This is passed into the `RequestBuilder` which then splits it out into the event
/// and metadata containing the keys, and metadata.
/// This event needs to be created prior to building the request so we can filter out
/// any events that error whilst rendering the templates.
#[derive(Serialize)]
pub(super) struct PulsarEvent {
    pub(super) event: Event,
    pub(super) topic: String,
    pub(super) key: Option<Bytes>,
    pub(super) properties: Option<HashMap<KeyString, Bytes>>,
    pub(super) timestamp_millis: Option<i64>,
}

impl EventCount for PulsarEvent {
    fn event_count(&self) -> usize {
        // A PulsarEvent represents one event.
        1
    }
}

impl ByteSizeOf for PulsarEvent {
    fn allocated_bytes(&self) -> usize {
        self.event.size_of()
            + self.topic.size_of()
            + self.key.as_ref().map_or(0, |bytes| bytes.size_of())
            + self.properties.as_ref().map_or(0, |props| {
                props
                    .iter()
                    .map(|(key, val)| key.allocated_bytes() + val.size_of())
                    .sum()
            })
    }
}

impl EstimatedJsonEncodedSizeOf for PulsarEvent {
    fn estimated_json_encoded_size_of(&self) -> JsonSize {
        self.event.estimated_json_encoded_size_of()
    }
}

pub(crate) async fn healthcheck(config: PulsarSinkConfig) -> crate::Result<()> {
    let client = config.create_pulsar_client().await?;
    let topic = config.topic.render_string(&LogEvent::from_str_legacy(""))?;
    client.lookup_topic(topic).await?;
    Ok(())
}

impl PulsarSink {
    pub(crate) fn new(
        client: Pulsar<TokioExecutor>,
        config: PulsarSinkConfig,
    ) -> crate::Result<Self> {
        let producer_opts = config.build_producer_options();
        let transformer = config.encoding.transformer();
        let serializer = config.encoding.build()?;
        let encoder = Encoder::<()>::new(serializer);
        let service = PulsarService::new(client, producer_opts, config.producer_name.clone());
        let topic_template = config.topic.clone();

        Ok(PulsarSink {
            config,
            transformer,
            encoder,
            service,
            topic_template,
        })
    }

    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        let service = ServiceBuilder::new().service(self.service);
        let request_builder = PulsarRequestBuilder {
            encoder: PulsarEncoder {
                transformer: self.transformer.clone(),
                encoder: self.encoder.clone(),
            },
        };
        let sink = input
            .filter_map(|event| {
                std::future::ready(util::make_pulsar_event(
                    &self.topic_template,
                    &self.config,
                    event,
                ))
            })
            .request_builder(default_request_builder_concurrency_limit(), request_builder)
            .filter_map(|request| async move {
                request
                    .map_err(|e| error!("Failed to build Pulsar request: {:?}.", e))
                    .ok()
            })
            .into_driver(service)
            .protocol("tcp");

        sink.run().await
    }
}

#[async_trait]
impl StreamSink<Event> for PulsarSink {
    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        self.run_inner(input).await
    }
}