vector/sinks/mqtt/
config.rs

1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, QoS, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::codecs::JsonSerializerConfig;
7
8use crate::template::Template;
9use crate::{
10    codecs::EncodingConfig,
11    common::mqtt::{
12        ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
13        TlsSnafu,
14    },
15    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
16    sinks::{mqtt::sink::MqttSink, prelude::*, Healthcheck, VectorSink},
17    tls::MaybeTlsSettings,
18};
19
20/// Configuration for the `mqtt` sink
21#[configurable_component(sink("mqtt"))]
22#[derive(Clone, Debug)]
23pub struct MqttSinkConfig {
24    #[serde(flatten)]
25    pub common: MqttCommonConfig,
26
27    /// If set to true, the MQTT session is cleaned on login.
28    #[serde(default = "default_clean_session")]
29    pub clean_session: bool,
30
31    /// MQTT publish topic (templates allowed)
32    pub topic: Template,
33
34    /// Whether the messages should be retained by the server
35    #[serde(default = "default_retain")]
36    pub retain: bool,
37
38    #[configurable(derived)]
39    pub encoding: EncodingConfig,
40
41    #[configurable(derived)]
42    #[serde(
43        default,
44        deserialize_with = "crate::serde::bool_or_struct",
45        skip_serializing_if = "crate::serde::is_default"
46    )]
47    pub acknowledgements: AcknowledgementsConfig,
48
49    #[configurable(derived)]
50    #[serde(default = "default_qos")]
51    pub quality_of_service: MqttQoS,
52}
53
54/// Supported Quality of Service types for MQTT.
55#[configurable_component]
56#[derive(Clone, Copy, Debug, Derivative)]
57#[derivative(Default)]
58#[serde(rename_all = "lowercase")]
59#[allow(clippy::enum_variant_names)]
60pub enum MqttQoS {
61    /// AtLeastOnce.
62    #[derivative(Default)]
63    AtLeastOnce,
64
65    /// AtMostOnce.
66    AtMostOnce,
67
68    /// ExactlyOnce.
69    ExactlyOnce,
70}
71
72impl From<MqttQoS> for QoS {
73    fn from(value: MqttQoS) -> Self {
74        match value {
75            MqttQoS::AtLeastOnce => QoS::AtLeastOnce,
76            MqttQoS::AtMostOnce => QoS::AtMostOnce,
77            MqttQoS::ExactlyOnce => QoS::ExactlyOnce,
78        }
79    }
80}
81
82const fn default_clean_session() -> bool {
83    false
84}
85
86const fn default_qos() -> MqttQoS {
87    MqttQoS::AtLeastOnce
88}
89
90const fn default_retain() -> bool {
91    false
92}
93
94impl Default for MqttSinkConfig {
95    fn default() -> Self {
96        Self {
97            common: MqttCommonConfig::default(),
98            clean_session: default_clean_session(),
99
100            topic: Template::try_from("vector").expect("Cannot parse as a template"),
101            retain: default_retain(),
102            encoding: JsonSerializerConfig::default().into(),
103            acknowledgements: AcknowledgementsConfig::default(),
104            quality_of_service: MqttQoS::default(),
105        }
106    }
107}
108
109impl_generate_config_from_default!(MqttSinkConfig);
110
111#[async_trait::async_trait]
112#[typetag::serde(name = "mqtt")]
113impl SinkConfig for MqttSinkConfig {
114    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
115        let connector = self.build_connector()?;
116        let sink = MqttSink::new(self, connector.clone())?;
117
118        Ok((
119            VectorSink::from_event_streamsink(sink),
120            Box::pin(async move { connector.healthcheck().await }),
121        ))
122    }
123
124    fn input(&self) -> Input {
125        Input::log()
126    }
127
128    fn acknowledgements(&self) -> &AcknowledgementsConfig {
129        &self.acknowledgements
130    }
131}
132
133impl MqttSinkConfig {
134    fn build_connector(&self) -> Result<MqttConnector, MqttError> {
135        let client_id = self.common.client_id.clone().unwrap_or_else(|| {
136            let hash = rand::rng()
137                .sample_iter(&rand_distr::Alphanumeric)
138                .take(6)
139                .map(char::from)
140                .collect::<String>();
141            format!("vectorSink{hash}")
142        });
143
144        if client_id.is_empty() {
145            return Err(ConfigurationError::EmptyClientId).context(ConfigurationSnafu);
146        }
147        let tls =
148            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
149        let mut options = MqttOptions::new(&client_id, &self.common.host, self.common.port);
150        options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
151        options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);
152        options.set_clean_session(self.clean_session);
153        match (&self.common.user, &self.common.password) {
154            (Some(user), Some(password)) => {
155                options.set_credentials(user, password);
156            }
157            (None, None) => {}
158            _ => {
159                return Err(MqttError::Configuration {
160                    source: ConfigurationError::InvalidCredentials,
161                });
162            }
163        }
164        if let Some(tls) = tls.tls() {
165            let ca = tls.authorities_pem().flatten().collect();
166            let client_auth = None;
167            let alpn = Some(vec!["mqtt".into()]);
168            options.set_transport(Transport::Tls(TlsConfiguration::Simple {
169                ca,
170                client_auth,
171                alpn,
172            }));
173        }
174        Ok(MqttConnector::new(options))
175    }
176}
177
178#[cfg(test)]
179mod test {
180    use super::*;
181
182    #[test]
183    fn generate_config() {
184        crate::test_util::test_generate_config::<MqttSinkConfig>();
185    }
186}