1use rumqttc::{AsyncClient, EventLoop, MqttOptions};
2use snafu::Snafu;
3use vector_config_macros::configurable_component;
4use vector_lib::tls::{TlsEnableableConfig, TlsError};
5
6use crate::template::TemplateParseError;
7
8#[configurable_component]
10#[derive(Clone, Debug, Derivative)]
11#[derivative(Default)]
12#[serde(deny_unknown_fields)]
13pub struct MqttCommonConfig {
14 #[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
16 pub host: String,
17
18 #[configurable(derived)]
20 #[serde(default = "default_port")]
21 #[derivative(Default(value = "default_port()"))]
22 pub port: u16,
23
24 #[serde(default)]
26 #[configurable(derived)]
27 pub user: Option<String>,
28
29 #[serde(default)]
31 #[configurable(derived)]
32 pub password: Option<String>,
33
34 #[serde(default)]
36 #[configurable(derived)]
37 pub client_id: Option<String>,
38
39 #[serde(default = "default_keep_alive")]
41 #[derivative(Default(value = "default_keep_alive()"))]
42 pub keep_alive: u16,
43
44 #[configurable(derived)]
46 pub tls: Option<TlsEnableableConfig>,
47}
48
49const fn default_port() -> u16 {
50 1883
51}
52
53const fn default_keep_alive() -> u16 {
54 60
55}
56
57#[derive(Debug, Snafu)]
59#[snafu(visibility(pub))]
60pub enum MqttError {
61 #[snafu(display("invalid topic template: {source}"))]
63 TopicTemplate {
64 source: TemplateParseError,
66 },
67 #[snafu(display("TLS error: {source}"))]
69 Tls {
70 source: TlsError,
72 },
73 #[snafu(display("MQTT configuration error: {source}"))]
75 Configuration {
76 source: ConfigurationError,
78 },
79}
80
81#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
83pub enum ConfigurationError {
84 #[snafu(display("Client ID is not allowed to be empty."))]
86 EmptyClientId,
87 #[snafu(display("Username and password must be either both provided or both missing."))]
89 InvalidCredentials,
90 #[snafu(display(
92 "Client ID must be 1-23 characters long and must consist of only alphanumeric characters."
93 ))]
94 InvalidClientId,
95 #[snafu(display("Username and password must be either both or neither provided."))]
97 IncompleteCredentials,
98}
99
100#[derive(Clone)]
101pub struct MqttConnector {
103 pub options: MqttOptions,
105}
106
107impl MqttConnector {
108 pub const fn new(options: MqttOptions) -> Self {
110 Self { options }
111 }
112
113 pub fn connect(&self) -> (AsyncClient, EventLoop) {
115 let (client, eventloop) = AsyncClient::new(self.options.clone(), 1024);
116 (client, eventloop)
117 }
118
119 pub async fn healthcheck(&self) -> crate::Result<()> {
121 Ok(())
122 }
123}