vector/sinks/mqtt/
config.rs

1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, QoS, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::codecs::JsonSerializerConfig;
7
8use crate::{
9    codecs::EncodingConfig,
10    common::mqtt::{
11        ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
12        TlsSnafu,
13    },
14    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
15    sinks::{Healthcheck, VectorSink, mqtt::sink::MqttSink, prelude::*},
16    template::Template,
17    tls::MaybeTlsSettings,
18};
19
20/// Configuration for the `mqtt` sink
21#[configurable_component(sink("mqtt"))]
22#[derive(Clone, Debug)]
23pub struct MqttSinkConfig {
24    #[serde(flatten)]
25    pub common: MqttCommonConfig,
26
27    /// If set to true, the MQTT session is cleaned on login.
28    #[serde(default = "default_clean_session")]
29    pub clean_session: bool,
30
31    /// MQTT publish topic (templates allowed)
32    pub topic: Template,
33
34    /// Whether the messages should be retained by the server
35    #[serde(default = "default_retain")]
36    pub retain: bool,
37
38    #[configurable(derived)]
39    pub encoding: EncodingConfig,
40
41    #[configurable(derived)]
42    #[serde(
43        default,
44        deserialize_with = "crate::serde::bool_or_struct",
45        skip_serializing_if = "crate::serde::is_default"
46    )]
47    pub acknowledgements: AcknowledgementsConfig,
48
49    #[configurable(derived)]
50    #[serde(default = "default_qos")]
51    pub quality_of_service: MqttQoS,
52}
53
54/// Supported Quality of Service types for MQTT.
55#[configurable_component]
56#[derive(Clone, Copy, Debug, Default)]
57#[serde(rename_all = "lowercase")]
58#[allow(clippy::enum_variant_names)]
59pub enum MqttQoS {
60    /// AtLeastOnce.
61    #[default]
62    AtLeastOnce,
63
64    /// AtMostOnce.
65    AtMostOnce,
66
67    /// ExactlyOnce.
68    ExactlyOnce,
69}
70
71impl From<MqttQoS> for QoS {
72    fn from(value: MqttQoS) -> Self {
73        match value {
74            MqttQoS::AtLeastOnce => QoS::AtLeastOnce,
75            MqttQoS::AtMostOnce => QoS::AtMostOnce,
76            MqttQoS::ExactlyOnce => QoS::ExactlyOnce,
77        }
78    }
79}
80
81const fn default_clean_session() -> bool {
82    false
83}
84
85const fn default_qos() -> MqttQoS {
86    MqttQoS::AtLeastOnce
87}
88
89const fn default_retain() -> bool {
90    false
91}
92
93impl Default for MqttSinkConfig {
94    fn default() -> Self {
95        Self {
96            common: MqttCommonConfig::default(),
97            clean_session: default_clean_session(),
98
99            topic: Template::try_from("vector").expect("Cannot parse as a template"),
100            retain: default_retain(),
101            encoding: JsonSerializerConfig::default().into(),
102            acknowledgements: AcknowledgementsConfig::default(),
103            quality_of_service: MqttQoS::default(),
104        }
105    }
106}
107
108impl_generate_config_from_default!(MqttSinkConfig);
109
110#[async_trait::async_trait]
111#[typetag::serde(name = "mqtt")]
112impl SinkConfig for MqttSinkConfig {
113    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
114        let connector = self.build_connector()?;
115        let sink = MqttSink::new(self, connector.clone())?;
116
117        Ok((
118            VectorSink::from_event_streamsink(sink),
119            Box::pin(async move { connector.healthcheck().await }),
120        ))
121    }
122
123    fn input(&self) -> Input {
124        Input::log()
125    }
126
127    fn acknowledgements(&self) -> &AcknowledgementsConfig {
128        &self.acknowledgements
129    }
130}
131
132impl MqttSinkConfig {
133    fn build_connector(&self) -> Result<MqttConnector, MqttError> {
134        let client_id = self.common.client_id.clone().unwrap_or_else(|| {
135            let hash = rand::rng()
136                .sample_iter(&rand_distr::Alphanumeric)
137                .take(6)
138                .map(char::from)
139                .collect::<String>();
140            format!("vectorSink{hash}")
141        });
142
143        if client_id.is_empty() {
144            return Err(ConfigurationError::EmptyClientId).context(ConfigurationSnafu);
145        }
146        let tls =
147            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
148        let mut options = MqttOptions::new(&client_id, &self.common.host, self.common.port);
149        options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
150        options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);
151        options.set_clean_session(self.clean_session);
152        match (&self.common.user, &self.common.password) {
153            (Some(user), Some(password)) => {
154                options.set_credentials(user, password);
155            }
156            (None, None) => {}
157            _ => {
158                return Err(MqttError::Configuration {
159                    source: ConfigurationError::InvalidCredentials,
160                });
161            }
162        }
163        if let Some(tls) = tls.tls() {
164            let ca = tls.authorities_pem().flatten().collect();
165            let client_auth = tls.identity_pem();
166            let alpn = Some(vec!["mqtt".into()]);
167            options.set_transport(Transport::Tls(TlsConfiguration::Simple {
168                ca,
169                client_auth,
170                alpn,
171            }));
172        }
173        Ok(MqttConnector::new(options))
174    }
175}
176
177#[cfg(test)]
178mod test {
179    use super::*;
180
181    #[test]
182    fn generate_config() {
183        crate::test_util::test_generate_config::<MqttSinkConfig>();
184    }
185}