vector/sinks/mqtt/
config.rs1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, QoS, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::codecs::JsonSerializerConfig;
7
8use crate::{
9 codecs::EncodingConfig,
10 common::mqtt::{
11 ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
12 TlsSnafu,
13 },
14 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
15 sinks::{Healthcheck, VectorSink, mqtt::sink::MqttSink, prelude::*},
16 template::Template,
17 tls::MaybeTlsSettings,
18};
19
20#[configurable_component(sink("mqtt"))]
22#[derive(Clone, Debug)]
23pub struct MqttSinkConfig {
24 #[serde(flatten)]
25 pub common: MqttCommonConfig,
26
27 #[serde(default = "default_clean_session")]
29 pub clean_session: bool,
30
31 pub topic: Template,
33
34 #[serde(default = "default_retain")]
36 pub retain: bool,
37
38 #[configurable(derived)]
39 pub encoding: EncodingConfig,
40
41 #[configurable(derived)]
42 #[serde(
43 default,
44 deserialize_with = "crate::serde::bool_or_struct",
45 skip_serializing_if = "crate::serde::is_default"
46 )]
47 pub acknowledgements: AcknowledgementsConfig,
48
49 #[configurable(derived)]
50 #[serde(default = "default_qos")]
51 pub quality_of_service: MqttQoS,
52}
53
54#[configurable_component]
56#[derive(Clone, Copy, Debug, Default)]
57#[serde(rename_all = "lowercase")]
58#[allow(clippy::enum_variant_names)]
59pub enum MqttQoS {
60 #[default]
62 AtLeastOnce,
63
64 AtMostOnce,
66
67 ExactlyOnce,
69}
70
71impl From<MqttQoS> for QoS {
72 fn from(value: MqttQoS) -> Self {
73 match value {
74 MqttQoS::AtLeastOnce => QoS::AtLeastOnce,
75 MqttQoS::AtMostOnce => QoS::AtMostOnce,
76 MqttQoS::ExactlyOnce => QoS::ExactlyOnce,
77 }
78 }
79}
80
81const fn default_clean_session() -> bool {
82 false
83}
84
85const fn default_qos() -> MqttQoS {
86 MqttQoS::AtLeastOnce
87}
88
89const fn default_retain() -> bool {
90 false
91}
92
93impl Default for MqttSinkConfig {
94 fn default() -> Self {
95 Self {
96 common: MqttCommonConfig::default(),
97 clean_session: default_clean_session(),
98
99 topic: Template::try_from("vector").expect("Cannot parse as a template"),
100 retain: default_retain(),
101 encoding: JsonSerializerConfig::default().into(),
102 acknowledgements: AcknowledgementsConfig::default(),
103 quality_of_service: MqttQoS::default(),
104 }
105 }
106}
107
108impl_generate_config_from_default!(MqttSinkConfig);
109
110#[async_trait::async_trait]
111#[typetag::serde(name = "mqtt")]
112impl SinkConfig for MqttSinkConfig {
113 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
114 let connector = self.build_connector()?;
115 let sink = MqttSink::new(self, connector.clone())?;
116
117 Ok((
118 VectorSink::from_event_streamsink(sink),
119 Box::pin(async move { connector.healthcheck().await }),
120 ))
121 }
122
123 fn input(&self) -> Input {
124 Input::log()
125 }
126
127 fn acknowledgements(&self) -> &AcknowledgementsConfig {
128 &self.acknowledgements
129 }
130}
131
132impl MqttSinkConfig {
133 fn build_connector(&self) -> Result<MqttConnector, MqttError> {
134 let client_id = self.common.client_id.clone().unwrap_or_else(|| {
135 let hash = rand::rng()
136 .sample_iter(&rand_distr::Alphanumeric)
137 .take(6)
138 .map(char::from)
139 .collect::<String>();
140 format!("vectorSink{hash}")
141 });
142
143 if client_id.is_empty() {
144 return Err(ConfigurationError::EmptyClientId).context(ConfigurationSnafu);
145 }
146 let tls =
147 MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
148 let mut options = MqttOptions::new(&client_id, &self.common.host, self.common.port);
149 options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
150 options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);
151 options.set_clean_session(self.clean_session);
152 match (&self.common.user, &self.common.password) {
153 (Some(user), Some(password)) => {
154 options.set_credentials(user, password);
155 }
156 (None, None) => {}
157 _ => {
158 return Err(MqttError::Configuration {
159 source: ConfigurationError::InvalidCredentials,
160 });
161 }
162 }
163 if let Some(tls) = tls.tls() {
164 let ca = tls.authorities_pem().flatten().collect();
165 let client_auth = tls.identity_pem();
166 let alpn = Some(vec!["mqtt".into()]);
167 options.set_transport(Transport::Tls(TlsConfiguration::Simple {
168 ca,
169 client_auth,
170 alpn,
171 }));
172 }
173 Ok(MqttConnector::new(options))
174 }
175}
176
177#[cfg(test)]
178mod test {
179 use super::*;
180
181 #[test]
182 fn generate_config() {
183 crate::test_util::test_generate_config::<MqttSinkConfig>();
184 }
185}