use std::time::Duration;
use rand::Rng;
use rumqttc::{MqttOptions, QoS, TlsConfiguration, Transport};
use snafu::{ResultExt, Snafu};
use vector_lib::codecs::JsonSerializerConfig;
use crate::template::Template;
use crate::{
codecs::EncodingConfig,
config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
sinks::{
mqtt::sink::{ConfigurationSnafu, MqttConnector, MqttError, MqttSink, TlsSnafu},
prelude::*,
Healthcheck, VectorSink,
},
tls::{MaybeTlsSettings, TlsEnableableConfig},
};
#[configurable_component(sink("mqtt"))]
#[derive(Clone, Debug)]
pub struct MqttSinkConfig {
#[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
pub host: String,
#[serde(default = "default_port")]
pub port: u16,
pub user: Option<String>,
pub password: Option<String>,
pub client_id: Option<String>,
#[serde(default = "default_keep_alive")]
pub keep_alive: u16,
#[serde(default = "default_clean_session")]
pub clean_session: bool,
#[configurable(derived)]
pub tls: Option<TlsEnableableConfig>,
pub topic: Template,
#[serde(default = "default_retain")]
pub retain: bool,
#[configurable(derived)]
pub encoding: EncodingConfig,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
#[configurable(derived)]
#[serde(default = "default_qos")]
pub quality_of_service: MqttQoS,
}
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative)]
#[derivative(Default)]
#[serde(rename_all = "lowercase")]
#[allow(clippy::enum_variant_names)]
pub enum MqttQoS {
#[derivative(Default)]
AtLeastOnce,
AtMostOnce,
ExactlyOnce,
}
impl From<MqttQoS> for QoS {
fn from(value: MqttQoS) -> Self {
match value {
MqttQoS::AtLeastOnce => QoS::AtLeastOnce,
MqttQoS::AtMostOnce => QoS::AtMostOnce,
MqttQoS::ExactlyOnce => QoS::ExactlyOnce,
}
}
}
const fn default_port() -> u16 {
1883
}
const fn default_keep_alive() -> u16 {
60
}
const fn default_clean_session() -> bool {
false
}
const fn default_qos() -> MqttQoS {
MqttQoS::AtLeastOnce
}
const fn default_retain() -> bool {
false
}
impl Default for MqttSinkConfig {
fn default() -> Self {
Self {
host: "localhost".into(),
port: default_port(),
user: None,
password: None,
client_id: None,
keep_alive: default_keep_alive(),
clean_session: default_clean_session(),
tls: None,
topic: Template::try_from("vector").expect("Cannot parse as a template"),
retain: default_retain(),
encoding: JsonSerializerConfig::default().into(),
acknowledgements: AcknowledgementsConfig::default(),
quality_of_service: MqttQoS::default(),
}
}
}
impl_generate_config_from_default!(MqttSinkConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "mqtt")]
impl SinkConfig for MqttSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let connector = self.build_connector()?;
let sink = MqttSink::new(self, connector.clone())?;
Ok((
VectorSink::from_event_streamsink(sink),
Box::pin(async move { connector.healthcheck().await }),
))
}
fn input(&self) -> Input {
Input::log()
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
pub enum ConfigurationError {
#[snafu(display("Client ID is not allowed to be empty."))]
EmptyClientId,
#[snafu(display("Username and password must be either both provided or both missing."))]
InvalidCredentials,
}
impl MqttSinkConfig {
fn build_connector(&self) -> Result<MqttConnector, MqttError> {
let client_id = self.client_id.clone().unwrap_or_else(|| {
let hash = rand::thread_rng()
.sample_iter(&rand_distr::Alphanumeric)
.take(6)
.map(char::from)
.collect::<String>();
format!("vectorSink{hash}")
});
if client_id.is_empty() {
return Err(ConfigurationError::EmptyClientId).context(ConfigurationSnafu);
}
let tls = MaybeTlsSettings::from_config(&self.tls, false).context(TlsSnafu)?;
let mut options = MqttOptions::new(&client_id, &self.host, self.port);
options.set_keep_alive(Duration::from_secs(self.keep_alive.into()));
options.set_clean_session(self.clean_session);
match (&self.user, &self.password) {
(Some(user), Some(password)) => {
options.set_credentials(user, password);
}
(None, None) => {}
_ => {
return Err(MqttError::Configuration {
source: ConfigurationError::InvalidCredentials,
});
}
}
if let Some(tls) = tls.tls() {
let ca = tls.authorities_pem().flatten().collect();
let client_auth = None;
let alpn = Some(vec!["mqtt".into()]);
options.set_transport(Transport::Tls(TlsConfiguration::Simple {
ca,
client_auth,
alpn,
}));
}
MqttConnector::new(options, self.topic.to_string())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<MqttSinkConfig>();
}
}