vector/common/
mqtt.rs

1use rumqttc::{AsyncClient, EventLoop, MqttOptions};
2use snafu::Snafu;
3use vector_config_macros::configurable_component;
4use vector_lib::tls::{TlsEnableableConfig, TlsError};
5
6use crate::template::TemplateParseError;
7
8/// Shared MQTT configuration for sources and sinks.
9#[configurable_component]
10#[derive(Clone, Debug, Derivative)]
11#[derivative(Default)]
12#[serde(deny_unknown_fields)]
13pub struct MqttCommonConfig {
14    /// MQTT server address (The broker’s domain name or IP address).
15    #[configurable(metadata(docs::examples = "mqtt.example.com", docs::examples = "127.0.0.1"))]
16    pub host: String,
17
18    /// TCP port of the MQTT server to connect to.
19    #[configurable(derived)]
20    #[serde(default = "default_port")]
21    #[derivative(Default(value = "default_port()"))]
22    pub port: u16,
23
24    /// MQTT username.
25    #[serde(default)]
26    #[configurable(derived)]
27    pub user: Option<String>,
28
29    /// MQTT password.
30    #[serde(default)]
31    #[configurable(derived)]
32    pub password: Option<String>,
33
34    /// MQTT client ID.
35    #[serde(default)]
36    #[configurable(derived)]
37    pub client_id: Option<String>,
38
39    /// Connection keep-alive interval.
40    #[serde(default = "default_keep_alive")]
41    #[derivative(Default(value = "default_keep_alive()"))]
42    pub keep_alive: u16,
43
44    /// Maximum packet size
45    #[serde(default = "default_max_packet_size")]
46    #[derivative(Default(value = "default_max_packet_size()"))]
47    pub max_packet_size: usize,
48
49    /// TLS configuration.
50    #[configurable(derived)]
51    pub tls: Option<TlsEnableableConfig>,
52}
53
54const fn default_port() -> u16 {
55    1883
56}
57
58const fn default_keep_alive() -> u16 {
59    60
60}
61
62const fn default_max_packet_size() -> usize {
63    10 * 1024
64}
65
66/// MQTT Error Types
67#[derive(Debug, Snafu)]
68#[snafu(visibility(pub))]
69pub enum MqttError {
70    /// Topic template parsing failed
71    #[snafu(display("invalid topic template: {source}"))]
72    TopicTemplate {
73        /// Source of error
74        source: TemplateParseError,
75    },
76    /// TLS error
77    #[snafu(display("TLS error: {source}"))]
78    Tls {
79        /// Source of error
80        source: TlsError,
81    },
82    /// Configuration error
83    #[snafu(display("MQTT configuration error: {source}"))]
84    Configuration {
85        /// Source of error
86        source: ConfigurationError,
87    },
88}
89
90/// MQTT Configuration error types
91#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
92pub enum ConfigurationError {
93    /// Empty client ID error
94    #[snafu(display("Client ID is not allowed to be empty."))]
95    EmptyClientId,
96    /// Invalid credentials provided error
97    #[snafu(display("Username and password must be either both provided or both missing."))]
98    InvalidCredentials,
99    /// Invalid client ID provied error
100    #[snafu(display(
101        "Client ID must be 1-23 characters long and must consist of only alphanumeric characters."
102    ))]
103    InvalidClientId,
104    /// Credentials provided were incomplete
105    #[snafu(display("Username and password must be either both or neither provided."))]
106    IncompleteCredentials,
107}
108
109#[derive(Clone)]
110/// Mqtt connector wrapper
111pub struct MqttConnector {
112    /// Mqtt connection options
113    pub options: MqttOptions,
114}
115
116impl MqttConnector {
117    /// Creates a new MqttConnector
118    pub const fn new(options: MqttOptions) -> Self {
119        Self { options }
120    }
121
122    /// Connects the connector and generates a client and eventloop
123    pub fn connect(&self) -> (AsyncClient, EventLoop) {
124        let (client, eventloop) = AsyncClient::new(self.options.clone(), 1024);
125        (client, eventloop)
126    }
127
128    /// TODO: Right now there is no way to implement the healthcheck properly: <https://github.com/bytebeamio/rumqtt/issues/562>
129    pub async fn healthcheck(&self) -> crate::Result<()> {
130        Ok(())
131    }
132}