1use rumqttc::{AsyncClient, EventLoop, MqttOptions};
2use snafu::Snafu;
3use vector_config_macros::configurable_component;
4use vector_lib::tls::{TlsEnableableConfig, TlsError};
5
6use crate::template::TemplateParseError;
7
8#[configurable_component]
10#[derive(Clone, Debug, Derivative)]
11#[derivative(Default)]
12#[serde(deny_unknown_fields)]
13pub struct MqttCommonConfig {
14    #[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
16    pub host: String,
17
18    #[configurable(derived)]
20    #[serde(default = "default_port")]
21    #[derivative(Default(value = "default_port()"))]
22    pub port: u16,
23
24    #[serde(default)]
26    #[configurable(derived)]
27    pub user: Option<String>,
28
29    #[serde(default)]
31    #[configurable(derived)]
32    pub password: Option<String>,
33
34    #[serde(default)]
36    #[configurable(derived)]
37    pub client_id: Option<String>,
38
39    #[serde(default = "default_keep_alive")]
41    #[derivative(Default(value = "default_keep_alive()"))]
42    pub keep_alive: u16,
43
44    #[serde(default = "default_max_packet_size")]
46    #[derivative(Default(value = "default_max_packet_size()"))]
47    pub max_packet_size: usize,
48
49    #[configurable(derived)]
51    pub tls: Option<TlsEnableableConfig>,
52}
53
54const fn default_port() -> u16 {
55    1883
56}
57
58const fn default_keep_alive() -> u16 {
59    60
60}
61
62const fn default_max_packet_size() -> usize {
63    10 * 1024
64}
65
66#[derive(Debug, Snafu)]
68#[snafu(visibility(pub))]
69pub enum MqttError {
70    #[snafu(display("invalid topic template: {source}"))]
72    TopicTemplate {
73        source: TemplateParseError,
75    },
76    #[snafu(display("TLS error: {source}"))]
78    Tls {
79        source: TlsError,
81    },
82    #[snafu(display("MQTT configuration error: {source}"))]
84    Configuration {
85        source: ConfigurationError,
87    },
88}
89
90#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
92pub enum ConfigurationError {
93    #[snafu(display("Client ID is not allowed to be empty."))]
95    EmptyClientId,
96    #[snafu(display("Username and password must be either both provided or both missing."))]
98    InvalidCredentials,
99    #[snafu(display(
101        "Client ID must be 1-23 characters long and must consist of only alphanumeric characters."
102    ))]
103    InvalidClientId,
104    #[snafu(display("Username and password must be either both or neither provided."))]
106    IncompleteCredentials,
107}
108
109#[derive(Clone)]
110pub struct MqttConnector {
112    pub options: MqttOptions,
114}
115
116impl MqttConnector {
117    pub const fn new(options: MqttOptions) -> Self {
119        Self { options }
120    }
121
122    pub fn connect(&self) -> (AsyncClient, EventLoop) {
124        let (client, eventloop) = AsyncClient::new(self.options.clone(), 1024);
125        (client, eventloop)
126    }
127
128    pub async fn healthcheck(&self) -> crate::Result<()> {
130        Ok(())
131    }
132}