vector/common/
mqtt.rs

1use rumqttc::{AsyncClient, EventLoop, MqttOptions};
2use snafu::Snafu;
3use vector_config_macros::configurable_component;
4use vector_lib::tls::{TlsEnableableConfig, TlsError};
5
6use crate::template::TemplateParseError;
7
8/// Shared MQTT configuration for sources and sinks.
9#[configurable_component]
10#[derive(Clone, Debug, Derivative)]
11#[derivative(Default)]
12#[serde(deny_unknown_fields)]
13pub struct MqttCommonConfig {
14    /// MQTT server address (The broker’s domain name or IP address).
15    #[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
16    pub host: String,
17
18    /// TCP port of the MQTT server to connect to.
19    #[configurable(derived)]
20    #[serde(default = "default_port")]
21    #[derivative(Default(value = "default_port()"))]
22    pub port: u16,
23
24    /// MQTT username.
25    #[serde(default)]
26    #[configurable(derived)]
27    pub user: Option<String>,
28
29    /// MQTT password.
30    #[serde(default)]
31    #[configurable(derived)]
32    pub password: Option<String>,
33
34    /// MQTT client ID.
35    #[serde(default)]
36    #[configurable(derived)]
37    pub client_id: Option<String>,
38
39    /// Connection keep-alive interval.
40    #[serde(default = "default_keep_alive")]
41    #[derivative(Default(value = "default_keep_alive()"))]
42    pub keep_alive: u16,
43
44    /// TLS configuration.
45    #[configurable(derived)]
46    pub tls: Option<TlsEnableableConfig>,
47}
48
49const fn default_port() -> u16 {
50    1883
51}
52
53const fn default_keep_alive() -> u16 {
54    60
55}
56
57/// MQTT Error Types
58#[derive(Debug, Snafu)]
59#[snafu(visibility(pub))]
60pub enum MqttError {
61    /// Topic template parsing failed
62    #[snafu(display("invalid topic template: {source}"))]
63    TopicTemplate {
64        /// Source of error
65        source: TemplateParseError,
66    },
67    /// TLS error
68    #[snafu(display("TLS error: {source}"))]
69    Tls {
70        /// Source of error
71        source: TlsError,
72    },
73    /// Configuration error
74    #[snafu(display("MQTT configuration error: {source}"))]
75    Configuration {
76        /// Source of error
77        source: ConfigurationError,
78    },
79}
80
81/// MQTT Configuration error types
82#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
83pub enum ConfigurationError {
84    /// Empty client ID error
85    #[snafu(display("Client ID is not allowed to be empty."))]
86    EmptyClientId,
87    /// Invalid credentials provided error
88    #[snafu(display("Username and password must be either both provided or both missing."))]
89    InvalidCredentials,
90    /// Invalid client ID provied error
91    #[snafu(display(
92        "Client ID must be 1-23 characters long and must consist of only alphanumeric characters."
93    ))]
94    InvalidClientId,
95    /// Credentials provided were incomplete
96    #[snafu(display("Username and password must be either both or neither provided."))]
97    IncompleteCredentials,
98}
99
100#[derive(Clone)]
101/// Mqtt connector wrapper
102pub struct MqttConnector {
103    /// Mqtt connection options
104    pub options: MqttOptions,
105}
106
107impl MqttConnector {
108    /// Creates a new MqttConnector
109    pub const fn new(options: MqttOptions) -> Self {
110        Self { options }
111    }
112
113    /// Connects the connector and generates a client and eventloop
114    pub fn connect(&self) -> (AsyncClient, EventLoop) {
115        let (client, eventloop) = AsyncClient::new(self.options.clone(), 1024);
116        (client, eventloop)
117    }
118
119    /// TODO: Right now there is no way to implement the healthcheck properly: <https://github.com/bytebeamio/rumqtt/issues/562>
120    pub async fn healthcheck(&self) -> crate::Result<()> {
121        Ok(())
122    }
123}