vector/sinks/mqtt/
sink.rs

1use async_trait::async_trait;
2use futures::{stream::BoxStream, StreamExt};
3
4use crate::common::mqtt::MqttConnector;
5use crate::internal_events::MqttConnectionError;
6use crate::sinks::prelude::*;
7
8use super::{
9    config::MqttQoS,
10    request_builder::{MqttEncoder, MqttRequestBuilder},
11    service::MqttService,
12    MqttSinkConfig,
13};
14
15pub struct MqttSink {
16    transformer: Transformer,
17    encoder: Encoder<()>,
18    connector: MqttConnector,
19    topic: Template,
20    quality_of_service: MqttQoS,
21    retain: bool,
22}
23
24pub(super) struct MqttEvent {
25    pub(super) topic: String,
26    pub(super) event: Event,
27}
28
29impl MqttSink {
30    pub fn new(config: &MqttSinkConfig, connector: MqttConnector) -> crate::Result<Self> {
31        let transformer = config.encoding.transformer();
32        let serializer = config.encoding.build()?;
33        let topic = config.topic.clone();
34        let encoder = Encoder::<()>::new(serializer);
35
36        Ok(Self {
37            transformer,
38            encoder,
39            connector,
40            topic,
41            quality_of_service: config.quality_of_service,
42            retain: config.retain,
43        })
44    }
45
46    fn make_mqtt_event(&self, event: Event) -> Option<MqttEvent> {
47        let topic = self
48            .topic
49            .render_string(&event)
50            .map_err(|missing_keys| {
51                emit!(TemplateRenderingError {
52                    error: missing_keys,
53                    field: Some("topic"),
54                    drop_event: true,
55                })
56            })
57            .ok()?;
58
59        Some(MqttEvent { topic, event })
60    }
61
62    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
63        let (client, mut connection) = self.connector.connect();
64
65        // This is necessary to keep the mqtt event loop moving forward.
66        tokio::spawn(async move {
67            loop {
68                // If an error is returned here there is currently no way to tie this back
69                // to the event that was posted which means we can't accurately provide
70                // delivery guarantees.
71                // We need this issue resolved first:
72                // https://github.com/bytebeamio/rumqtt/issues/349
73                match connection.poll().await {
74                    Ok(_) => {}
75                    Err(connection_error) => {
76                        emit!(MqttConnectionError {
77                            error: connection_error
78                        });
79                    }
80                }
81            }
82        });
83
84        let service = ServiceBuilder::new().service(MqttService {
85            client,
86            quality_of_service: self.quality_of_service,
87            retain: self.retain,
88        });
89
90        let request_builder = MqttRequestBuilder {
91            encoder: MqttEncoder {
92                encoder: self.encoder.clone(),
93                transformer: self.transformer.clone(),
94            },
95        };
96
97        input
98            .filter_map(|event| std::future::ready(self.make_mqtt_event(event)))
99            .request_builder(default_request_builder_concurrency_limit(), request_builder)
100            .filter_map(|request| async move {
101                match request {
102                    Err(e) => {
103                        error!("Failed to build MQTT request: {:?}.", e);
104                        None
105                    }
106                    Ok(req) => Some(req),
107                }
108            })
109            .into_driver(service)
110            .protocol("mqtt")
111            .run()
112            .await
113    }
114}
115
116#[async_trait]
117impl StreamSink<Event> for MqttSink {
118    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
119        self.run_inner(input).await
120    }
121}