vector/sources/mqtt/
config.rs

1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::{
7    codecs::decoding::{DeserializerConfig, FramingConfig},
8    config::{LegacyKey, LogNamespace},
9    configurable::configurable_component,
10    lookup::lookup_v2::OptionalValuePath,
11    lookup::owned_value_path,
12    tls::MaybeTlsSettings,
13};
14use vrl::value::Kind;
15
16use crate::{
17    codecs::DecodingConfig,
18    common::mqtt::{
19        ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
20        TlsSnafu,
21    },
22    config::{SourceConfig, SourceContext, SourceOutput},
23    serde::{default_decoding, default_framing_message_based},
24};
25
26use super::source::MqttSource;
27
28/// Configuration for the `mqtt` source.
29#[configurable_component(source("mqtt", "Collect logs from MQTT."))]
30#[derive(Clone, Debug, Derivative)]
31#[derivative(Default)]
32#[serde(deny_unknown_fields)]
33pub struct MqttSourceConfig {
34    #[serde(flatten)]
35    pub common: MqttCommonConfig,
36
37    /// MQTT topic from which messages are to be read.
38    #[configurable(derived)]
39    #[serde(default = "default_topic")]
40    #[derivative(Default(value = "default_topic()"))]
41    pub topic: String,
42
43    #[configurable(derived)]
44    #[serde(default = "default_framing_message_based")]
45    #[derivative(Default(value = "default_framing_message_based()"))]
46    pub framing: FramingConfig,
47
48    #[configurable(derived)]
49    #[serde(default = "default_decoding")]
50    #[derivative(Default(value = "default_decoding()"))]
51    pub decoding: DeserializerConfig,
52
53    /// The namespace to use for logs. This overrides the global setting.
54    #[configurable(metadata(docs::hidden))]
55    #[serde(default)]
56    pub log_namespace: Option<bool>,
57
58    /// Overrides the name of the log field used to add the topic to each event.
59    ///
60    /// The value is the topic from which the MQTT message was published to.
61    ///
62    /// By default, `"topic"` is used.
63    #[serde(default = "default_topic_key")]
64    #[configurable(metadata(docs::examples = "topic"))]
65    pub topic_key: OptionalValuePath,
66}
67
68fn default_topic() -> String {
69    "vector".to_owned()
70}
71
72fn default_topic_key() -> OptionalValuePath {
73    OptionalValuePath::from(owned_value_path!("topic"))
74}
75
76#[async_trait::async_trait]
77#[typetag::serde(name = "mqtt")]
78impl SourceConfig for MqttSourceConfig {
79    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
80        let log_namespace = cx.log_namespace(self.log_namespace);
81
82        let connector = self.build_connector()?;
83
84        let decoder =
85            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
86                .build()?;
87
88        let sink = MqttSource::new(connector.clone(), decoder, log_namespace, self.clone())?;
89        Ok(Box::pin(sink.run(cx.out, cx.shutdown)))
90    }
91
92    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
93        let schema_definition = self
94            .decoding
95            .schema_definition(global_log_namespace.merge(self.log_namespace))
96            .with_standard_vector_source_metadata()
97            .with_source_metadata(
98                Self::NAME,
99                Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
100                &owned_value_path!("timestamp"),
101                Kind::timestamp().or_undefined(),
102                Some("timestamp"),
103            );
104
105        vec![SourceOutput::new_maybe_logs(
106            self.decoding.output_type(),
107            schema_definition,
108        )]
109    }
110
111    fn can_acknowledge(&self) -> bool {
112        false
113    }
114}
115
116impl MqttSourceConfig {
117    fn build_connector(&self) -> Result<MqttConnector, MqttError> {
118        let client_id = self.common.client_id.clone().unwrap_or_else(|| {
119            let hash = rand::rng()
120                .sample_iter(&rand_distr::Alphanumeric)
121                .take(6)
122                .map(char::from)
123                .collect::<String>();
124            format!("vectorSource{hash}")
125        });
126
127        if client_id.is_empty() {
128            return Err(ConfigurationError::InvalidClientId).context(ConfigurationSnafu);
129        }
130
131        let tls =
132            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
133        let mut options = MqttOptions::new(client_id, &self.common.host, self.common.port);
134        options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
135        options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);
136
137        options.set_clean_session(false);
138        match (&self.common.user, &self.common.password) {
139            (Some(user), Some(password)) => {
140                options.set_credentials(user, password);
141            }
142            (None, None) => {
143                // Credentials were not provided
144            }
145            _ => {
146                // We need either both username and password, or neither. MQTT also allows for providing only password, but rumqttc does not allow that so we cannot either.
147                return Err(ConfigurationError::IncompleteCredentials).context(ConfigurationSnafu);
148            }
149        }
150
151        if let Some(tls) = tls.tls() {
152            let ca = tls.authorities_pem().flatten().collect();
153            let client_auth = None;
154            let alpn = Some(vec!["mqtt".into()]);
155            options.set_transport(Transport::Tls(TlsConfiguration::Simple {
156                ca,
157                client_auth,
158                alpn,
159            }));
160        }
161
162        Ok(MqttConnector::new(options))
163    }
164}
165
166impl_generate_config_from_default!(MqttSourceConfig);
167
168#[cfg(test)]
169mod test {
170    use super::*;
171
172    #[test]
173    fn generate_config() {
174        crate::test_util::test_generate_config::<MqttSourceConfig>();
175    }
176}