vector/sources/mqtt/
config.rs

1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::{
7    codecs::decoding::{DeserializerConfig, FramingConfig},
8    config::{LegacyKey, LogNamespace},
9    configurable::configurable_component,
10    lookup::{lookup_v2::OptionalValuePath, owned_value_path},
11    tls::MaybeTlsSettings,
12};
13use vrl::value::Kind;
14
15use super::source::MqttSource;
16use crate::{
17    codecs::DecodingConfig,
18    common::mqtt::{
19        ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
20        TlsSnafu,
21    },
22    config::{SourceConfig, SourceContext, SourceOutput},
23    serde::{default_decoding, default_framing_message_based},
24};
25
26/// Configuration for the `mqtt` source.
27#[configurable_component(source("mqtt", "Collect logs from MQTT."))]
28#[derive(Clone, Debug, Derivative)]
29#[derivative(Default)]
30#[serde(deny_unknown_fields)]
31pub struct MqttSourceConfig {
32    #[serde(flatten)]
33    pub common: MqttCommonConfig,
34
35    /// MQTT topic from which messages are to be read.
36    #[configurable(derived)]
37    #[serde(default = "default_topic")]
38    #[derivative(Default(value = "default_topic()"))]
39    pub topic: String,
40
41    #[configurable(derived)]
42    #[serde(default = "default_framing_message_based")]
43    #[derivative(Default(value = "default_framing_message_based()"))]
44    pub framing: FramingConfig,
45
46    #[configurable(derived)]
47    #[serde(default = "default_decoding")]
48    #[derivative(Default(value = "default_decoding()"))]
49    pub decoding: DeserializerConfig,
50
51    /// The namespace to use for logs. This overrides the global setting.
52    #[configurable(metadata(docs::hidden))]
53    #[serde(default)]
54    pub log_namespace: Option<bool>,
55
56    /// Overrides the name of the log field used to add the topic to each event.
57    ///
58    /// The value is the topic from which the MQTT message was published to.
59    ///
60    /// By default, `"topic"` is used.
61    #[serde(default = "default_topic_key")]
62    #[configurable(metadata(docs::examples = "topic"))]
63    pub topic_key: OptionalValuePath,
64}
65
66fn default_topic() -> String {
67    "vector".to_owned()
68}
69
70fn default_topic_key() -> OptionalValuePath {
71    OptionalValuePath::from(owned_value_path!("topic"))
72}
73
74#[async_trait::async_trait]
75#[typetag::serde(name = "mqtt")]
76impl SourceConfig for MqttSourceConfig {
77    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
78        let log_namespace = cx.log_namespace(self.log_namespace);
79
80        let connector = self.build_connector()?;
81
82        let decoder =
83            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
84                .build()?;
85
86        let sink = MqttSource::new(connector.clone(), decoder, log_namespace, self.clone())?;
87        Ok(Box::pin(sink.run(cx.out, cx.shutdown)))
88    }
89
90    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
91        let schema_definition = self
92            .decoding
93            .schema_definition(global_log_namespace.merge(self.log_namespace))
94            .with_standard_vector_source_metadata()
95            .with_source_metadata(
96                Self::NAME,
97                Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
98                &owned_value_path!("timestamp"),
99                Kind::timestamp().or_undefined(),
100                Some("timestamp"),
101            );
102
103        vec![SourceOutput::new_maybe_logs(
104            self.decoding.output_type(),
105            schema_definition,
106        )]
107    }
108
109    fn can_acknowledge(&self) -> bool {
110        false
111    }
112}
113
114impl MqttSourceConfig {
115    fn build_connector(&self) -> Result<MqttConnector, MqttError> {
116        let client_id = self.common.client_id.clone().unwrap_or_else(|| {
117            let hash = rand::rng()
118                .sample_iter(&rand_distr::Alphanumeric)
119                .take(6)
120                .map(char::from)
121                .collect::<String>();
122            format!("vectorSource{hash}")
123        });
124
125        if client_id.is_empty() {
126            return Err(ConfigurationError::InvalidClientId).context(ConfigurationSnafu);
127        }
128
129        let tls =
130            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
131        let mut options = MqttOptions::new(client_id, &self.common.host, self.common.port);
132        options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
133        options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);
134
135        options.set_clean_session(false);
136        match (&self.common.user, &self.common.password) {
137            (Some(user), Some(password)) => {
138                options.set_credentials(user, password);
139            }
140            (None, None) => {
141                // Credentials were not provided
142            }
143            _ => {
144                // We need either both username and password, or neither. MQTT also allows for providing only password, but rumqttc does not allow that so we cannot either.
145                return Err(ConfigurationError::IncompleteCredentials).context(ConfigurationSnafu);
146            }
147        }
148
149        if let Some(tls) = tls.tls() {
150            let ca = tls.authorities_pem().flatten().collect();
151            let client_auth = None;
152            let alpn = Some(vec!["mqtt".into()]);
153            options.set_transport(Transport::Tls(TlsConfiguration::Simple {
154                ca,
155                client_auth,
156                alpn,
157            }));
158        }
159
160        Ok(MqttConnector::new(options))
161    }
162}
163
164impl_generate_config_from_default!(MqttSourceConfig);
165
166#[cfg(test)]
167mod test {
168    use super::*;
169
170    #[test]
171    fn generate_config() {
172        crate::test_util::test_generate_config::<MqttSourceConfig>();
173    }
174}