vector/sinks/mqtt/
sink.rs1use async_trait::async_trait;
2use futures::{stream::BoxStream, StreamExt};
3
4use crate::common::mqtt::MqttConnector;
5use crate::internal_events::MqttConnectionError;
6use crate::sinks::prelude::*;
7
8use super::{
9 config::MqttQoS,
10 request_builder::{MqttEncoder, MqttRequestBuilder},
11 service::MqttService,
12 MqttSinkConfig,
13};
14
15pub struct MqttSink {
16 transformer: Transformer,
17 encoder: Encoder<()>,
18 connector: MqttConnector,
19 topic: Template,
20 quality_of_service: MqttQoS,
21 retain: bool,
22}
23
24pub(super) struct MqttEvent {
25 pub(super) topic: String,
26 pub(super) event: Event,
27}
28
29impl MqttSink {
30 pub fn new(config: &MqttSinkConfig, connector: MqttConnector) -> crate::Result<Self> {
31 let transformer = config.encoding.transformer();
32 let serializer = config.encoding.build()?;
33 let topic = config.topic.clone();
34 let encoder = Encoder::<()>::new(serializer);
35
36 Ok(Self {
37 transformer,
38 encoder,
39 connector,
40 topic,
41 quality_of_service: config.quality_of_service,
42 retain: config.retain,
43 })
44 }
45
46 fn make_mqtt_event(&self, event: Event) -> Option<MqttEvent> {
47 let topic = self
48 .topic
49 .render_string(&event)
50 .map_err(|missing_keys| {
51 emit!(TemplateRenderingError {
52 error: missing_keys,
53 field: Some("topic"),
54 drop_event: true,
55 })
56 })
57 .ok()?;
58
59 Some(MqttEvent { topic, event })
60 }
61
62 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
63 let (client, mut connection) = self.connector.connect();
64
65 tokio::spawn(async move {
67 loop {
68 match connection.poll().await {
74 Ok(_) => {}
75 Err(connection_error) => {
76 emit!(MqttConnectionError {
77 error: connection_error
78 });
79 }
80 }
81 }
82 });
83
84 let service = ServiceBuilder::new().service(MqttService {
85 client,
86 quality_of_service: self.quality_of_service,
87 retain: self.retain,
88 });
89
90 let request_builder = MqttRequestBuilder {
91 encoder: MqttEncoder {
92 encoder: self.encoder.clone(),
93 transformer: self.transformer.clone(),
94 },
95 };
96
97 input
98 .filter_map(|event| std::future::ready(self.make_mqtt_event(event)))
99 .request_builder(default_request_builder_concurrency_limit(), request_builder)
100 .filter_map(|request| async move {
101 match request {
102 Err(e) => {
103 error!("Failed to build MQTT request: {:?}.", e);
104 None
105 }
106 Ok(req) => Some(req),
107 }
108 })
109 .into_driver(service)
110 .protocol("mqtt")
111 .run()
112 .await
113 }
114}
115
116#[async_trait]
117impl StreamSink<Event> for MqttSink {
118 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
119 self.run_inner(input).await
120 }
121}