1use rumqttc::{AsyncClient, EventLoop, MqttOptions};
2use snafu::Snafu;
3use vector_config_macros::configurable_component;
4use vector_lib::tls::{TlsEnableableConfig, TlsError};
5
6use crate::template::TemplateParseError;
7
8#[configurable_component]
10#[derive(Clone, Debug, Derivative)]
11#[derivative(Default)]
12#[serde(deny_unknown_fields)]
13pub struct MqttCommonConfig {
14 #[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
16 pub host: String,
17
18 #[configurable(derived)]
20 #[serde(default = "default_port")]
21 #[derivative(Default(value = "default_port()"))]
22 pub port: u16,
23
24 #[serde(default)]
26 #[configurable(derived)]
27 pub user: Option<String>,
28
29 #[serde(default)]
31 #[configurable(derived)]
32 pub password: Option<String>,
33
34 #[serde(default)]
36 #[configurable(derived)]
37 pub client_id: Option<String>,
38
39 #[serde(default = "default_keep_alive")]
41 #[derivative(Default(value = "default_keep_alive()"))]
42 pub keep_alive: u16,
43
44 #[serde(default = "default_max_packet_size")]
46 #[derivative(Default(value = "default_max_packet_size()"))]
47 pub max_packet_size: usize,
48
49 #[configurable(derived)]
51 pub tls: Option<TlsEnableableConfig>,
52}
53
54const fn default_port() -> u16 {
55 1883
56}
57
58const fn default_keep_alive() -> u16 {
59 60
60}
61
62const fn default_max_packet_size() -> usize {
63 10 * 1024
64}
65
66#[derive(Debug, Snafu)]
68#[snafu(visibility(pub))]
69pub enum MqttError {
70 #[snafu(display("invalid topic template: {source}"))]
72 TopicTemplate {
73 source: TemplateParseError,
75 },
76 #[snafu(display("TLS error: {source}"))]
78 Tls {
79 source: TlsError,
81 },
82 #[snafu(display("MQTT configuration error: {source}"))]
84 Configuration {
85 source: ConfigurationError,
87 },
88}
89
90#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
92pub enum ConfigurationError {
93 #[snafu(display("Client ID is not allowed to be empty."))]
95 EmptyClientId,
96 #[snafu(display("Username and password must be either both provided or both missing."))]
98 InvalidCredentials,
99 #[snafu(display(
101 "Client ID must be 1-23 characters long and must consist of only alphanumeric characters."
102 ))]
103 InvalidClientId,
104 #[snafu(display("Username and password must be either both or neither provided."))]
106 IncompleteCredentials,
107}
108
109#[derive(Clone)]
110pub struct MqttConnector {
112 pub options: MqttOptions,
114}
115
116impl MqttConnector {
117 pub const fn new(options: MqttOptions) -> Self {
119 Self { options }
120 }
121
122 pub fn connect(&self) -> (AsyncClient, EventLoop) {
124 let (client, eventloop) = AsyncClient::new(self.options.clone(), 1024);
125 (client, eventloop)
126 }
127
128 pub async fn healthcheck(&self) -> crate::Result<()> {
130 Ok(())
131 }
132}