vector/sinks/mqtt/
sink.rs1use async_trait::async_trait;
2use futures::{StreamExt, stream::BoxStream};
3
4use super::{
5 MqttSinkConfig,
6 config::MqttQoS,
7 request_builder::{MqttEncoder, MqttRequestBuilder},
8 service::MqttService,
9};
10use crate::{common::mqtt::MqttConnector, internal_events::MqttConnectionError, sinks::prelude::*};
11
12pub struct MqttSink {
13 transformer: Transformer,
14 encoder: Encoder<()>,
15 connector: MqttConnector,
16 topic: Template,
17 quality_of_service: MqttQoS,
18 retain: bool,
19}
20
21pub(super) struct MqttEvent {
22 pub(super) topic: String,
23 pub(super) event: Event,
24}
25
26impl MqttSink {
27 pub fn new(config: &MqttSinkConfig, connector: MqttConnector) -> crate::Result<Self> {
28 let transformer = config.encoding.transformer();
29 let serializer = config.encoding.build()?;
30 let topic = config.topic.clone();
31 let encoder = Encoder::<()>::new(serializer);
32
33 Ok(Self {
34 transformer,
35 encoder,
36 connector,
37 topic,
38 quality_of_service: config.quality_of_service,
39 retain: config.retain,
40 })
41 }
42
43 fn make_mqtt_event(&self, event: Event) -> Option<MqttEvent> {
44 let topic = self
45 .topic
46 .render_string(&event)
47 .map_err(|missing_keys| {
48 emit!(TemplateRenderingError {
49 error: missing_keys,
50 field: Some("topic"),
51 drop_event: true,
52 })
53 })
54 .ok()?;
55
56 Some(MqttEvent { topic, event })
57 }
58
59 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
60 let (client, mut connection) = self.connector.connect();
61
62 tokio::spawn(async move {
64 loop {
65 match connection.poll().await {
71 Ok(_) => {}
72 Err(connection_error) => {
73 emit!(MqttConnectionError {
74 error: connection_error
75 });
76 }
77 }
78 }
79 });
80
81 let service = ServiceBuilder::new().service(MqttService {
82 client,
83 quality_of_service: self.quality_of_service,
84 retain: self.retain,
85 });
86
87 let request_builder = MqttRequestBuilder {
88 encoder: MqttEncoder {
89 encoder: self.encoder.clone(),
90 transformer: self.transformer.clone(),
91 },
92 };
93
94 input
95 .filter_map(|event| std::future::ready(self.make_mqtt_event(event)))
96 .request_builder(default_request_builder_concurrency_limit(), request_builder)
97 .filter_map(|request| async move {
98 match request {
99 Err(e) => {
100 error!("Failed to build MQTT request: {:?}.", e);
101 None
102 }
103 Ok(req) => Some(req),
104 }
105 })
106 .into_driver(service)
107 .protocol("mqtt")
108 .run()
109 .await
110 }
111}
112
113#[async_trait]
114impl StreamSink<Event> for MqttSink {
115 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
116 self.run_inner(input).await
117 }
118}