vector/sinks/mqtt/
config.rs1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, QoS, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::codecs::JsonSerializerConfig;
7
8use crate::template::Template;
9use crate::{
10 codecs::EncodingConfig,
11 common::mqtt::{
12 ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
13 TlsSnafu,
14 },
15 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
16 sinks::{mqtt::sink::MqttSink, prelude::*, Healthcheck, VectorSink},
17 tls::MaybeTlsSettings,
18};
19
20#[configurable_component(sink("mqtt"))]
22#[derive(Clone, Debug)]
23pub struct MqttSinkConfig {
24 #[serde(flatten)]
25 pub common: MqttCommonConfig,
26
27 #[serde(default = "default_clean_session")]
29 pub clean_session: bool,
30
31 pub topic: Template,
33
34 #[serde(default = "default_retain")]
36 pub retain: bool,
37
38 #[configurable(derived)]
39 pub encoding: EncodingConfig,
40
41 #[configurable(derived)]
42 #[serde(
43 default,
44 deserialize_with = "crate::serde::bool_or_struct",
45 skip_serializing_if = "crate::serde::is_default"
46 )]
47 pub acknowledgements: AcknowledgementsConfig,
48
49 #[configurable(derived)]
50 #[serde(default = "default_qos")]
51 pub quality_of_service: MqttQoS,
52}
53
54#[configurable_component]
56#[derive(Clone, Copy, Debug, Derivative)]
57#[derivative(Default)]
58#[serde(rename_all = "lowercase")]
59#[allow(clippy::enum_variant_names)]
60pub enum MqttQoS {
61 #[derivative(Default)]
63 AtLeastOnce,
64
65 AtMostOnce,
67
68 ExactlyOnce,
70}
71
72impl From<MqttQoS> for QoS {
73 fn from(value: MqttQoS) -> Self {
74 match value {
75 MqttQoS::AtLeastOnce => QoS::AtLeastOnce,
76 MqttQoS::AtMostOnce => QoS::AtMostOnce,
77 MqttQoS::ExactlyOnce => QoS::ExactlyOnce,
78 }
79 }
80}
81
82const fn default_clean_session() -> bool {
83 false
84}
85
86const fn default_qos() -> MqttQoS {
87 MqttQoS::AtLeastOnce
88}
89
90const fn default_retain() -> bool {
91 false
92}
93
94impl Default for MqttSinkConfig {
95 fn default() -> Self {
96 Self {
97 common: MqttCommonConfig::default(),
98 clean_session: default_clean_session(),
99
100 topic: Template::try_from("vector").expect("Cannot parse as a template"),
101 retain: default_retain(),
102 encoding: JsonSerializerConfig::default().into(),
103 acknowledgements: AcknowledgementsConfig::default(),
104 quality_of_service: MqttQoS::default(),
105 }
106 }
107}
108
109impl_generate_config_from_default!(MqttSinkConfig);
110
111#[async_trait::async_trait]
112#[typetag::serde(name = "mqtt")]
113impl SinkConfig for MqttSinkConfig {
114 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
115 let connector = self.build_connector()?;
116 let sink = MqttSink::new(self, connector.clone())?;
117
118 Ok((
119 VectorSink::from_event_streamsink(sink),
120 Box::pin(async move { connector.healthcheck().await }),
121 ))
122 }
123
124 fn input(&self) -> Input {
125 Input::log()
126 }
127
128 fn acknowledgements(&self) -> &AcknowledgementsConfig {
129 &self.acknowledgements
130 }
131}
132
133impl MqttSinkConfig {
134 fn build_connector(&self) -> Result<MqttConnector, MqttError> {
135 let client_id = self.common.client_id.clone().unwrap_or_else(|| {
136 let hash = rand::rng()
137 .sample_iter(&rand_distr::Alphanumeric)
138 .take(6)
139 .map(char::from)
140 .collect::<String>();
141 format!("vectorSink{hash}")
142 });
143
144 if client_id.is_empty() {
145 return Err(ConfigurationError::EmptyClientId).context(ConfigurationSnafu);
146 }
147 let tls =
148 MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
149 let mut options = MqttOptions::new(&client_id, &self.common.host, self.common.port);
150 options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
151 options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);
152 options.set_clean_session(self.clean_session);
153 match (&self.common.user, &self.common.password) {
154 (Some(user), Some(password)) => {
155 options.set_credentials(user, password);
156 }
157 (None, None) => {}
158 _ => {
159 return Err(MqttError::Configuration {
160 source: ConfigurationError::InvalidCredentials,
161 });
162 }
163 }
164 if let Some(tls) = tls.tls() {
165 let ca = tls.authorities_pem().flatten().collect();
166 let client_auth = None;
167 let alpn = Some(vec!["mqtt".into()]);
168 options.set_transport(Transport::Tls(TlsConfiguration::Simple {
169 ca,
170 client_auth,
171 alpn,
172 }));
173 }
174 Ok(MqttConnector::new(options))
175 }
176}
177
178#[cfg(test)]
179mod test {
180 use super::*;
181
182 #[test]
183 fn generate_config() {
184 crate::test_util::test_generate_config::<MqttSinkConfig>();
185 }
186}