vector/sinks/mqtt/
sink.rs

1use async_trait::async_trait;
2use futures::{StreamExt, stream::BoxStream};
3
4use super::{
5    MqttSinkConfig,
6    config::MqttQoS,
7    request_builder::{MqttEncoder, MqttRequestBuilder},
8    service::MqttService,
9};
10use crate::{common::mqtt::MqttConnector, internal_events::MqttConnectionError, sinks::prelude::*};
11
12pub struct MqttSink {
13    transformer: Transformer,
14    encoder: Encoder<()>,
15    connector: MqttConnector,
16    topic: Template,
17    quality_of_service: MqttQoS,
18    retain: bool,
19}
20
21pub(super) struct MqttEvent {
22    pub(super) topic: String,
23    pub(super) event: Event,
24}
25
26impl MqttSink {
27    pub fn new(config: &MqttSinkConfig, connector: MqttConnector) -> crate::Result<Self> {
28        let transformer = config.encoding.transformer();
29        let serializer = config.encoding.build()?;
30        let topic = config.topic.clone();
31        let encoder = Encoder::<()>::new(serializer);
32
33        Ok(Self {
34            transformer,
35            encoder,
36            connector,
37            topic,
38            quality_of_service: config.quality_of_service,
39            retain: config.retain,
40        })
41    }
42
43    fn make_mqtt_event(&self, event: Event) -> Option<MqttEvent> {
44        let topic = self
45            .topic
46            .render_string(&event)
47            .map_err(|missing_keys| {
48                emit!(TemplateRenderingError {
49                    error: missing_keys,
50                    field: Some("topic"),
51                    drop_event: true,
52                })
53            })
54            .ok()?;
55
56        Some(MqttEvent { topic, event })
57    }
58
59    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
60        let (client, mut connection) = self.connector.connect();
61
62        // This is necessary to keep the mqtt event loop moving forward.
63        tokio::spawn(async move {
64            loop {
65                // If an error is returned here there is currently no way to tie this back
66                // to the event that was posted which means we can't accurately provide
67                // delivery guarantees.
68                // We need this issue resolved first:
69                // https://github.com/bytebeamio/rumqtt/issues/349
70                match connection.poll().await {
71                    Ok(_) => {}
72                    Err(connection_error) => {
73                        emit!(MqttConnectionError {
74                            error: connection_error
75                        });
76                    }
77                }
78            }
79        });
80
81        let service = ServiceBuilder::new().service(MqttService {
82            client,
83            quality_of_service: self.quality_of_service,
84            retain: self.retain,
85        });
86
87        let request_builder = MqttRequestBuilder {
88            encoder: MqttEncoder {
89                encoder: self.encoder.clone(),
90                transformer: self.transformer.clone(),
91            },
92        };
93
94        input
95            .filter_map(|event| std::future::ready(self.make_mqtt_event(event)))
96            .request_builder(default_request_builder_concurrency_limit(), request_builder)
97            .filter_map(|request| async move {
98                match request {
99                    Err(e) => {
100                        error!("Failed to build MQTT request: {:?}.", e);
101                        None
102                    }
103                    Ok(req) => Some(req),
104                }
105            })
106            .into_driver(service)
107            .protocol("mqtt")
108            .run()
109            .await
110    }
111}
112
113#[async_trait]
114impl StreamSink<Event> for MqttSink {
115    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
116        self.run_inner(input).await
117    }
118}