vector/sources/mqtt/
config.rs

1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::{
7    codecs::decoding::{DeserializerConfig, FramingConfig},
8    config::{LegacyKey, LogNamespace},
9    configurable::configurable_component,
10    lookup::lookup_v2::OptionalValuePath,
11    lookup::owned_value_path,
12    tls::MaybeTlsSettings,
13};
14use vrl::value::Kind;
15
16use crate::{
17    codecs::DecodingConfig,
18    common::mqtt::{
19        ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
20        TlsSnafu,
21    },
22    config::{SourceConfig, SourceContext, SourceOutput},
23    serde::{default_decoding, default_framing_message_based},
24};
25
26use super::source::MqttSource;
27
28/// Configuration for the `mqtt` source.
29#[configurable_component(source("mqtt", "Collect logs from MQTT."))]
30#[derive(Clone, Debug, Derivative)]
31#[derivative(Default)]
32#[serde(deny_unknown_fields)]
33pub struct MqttSourceConfig {
34    #[serde(flatten)]
35    pub common: MqttCommonConfig,
36
37    /// MQTT topic from which messages are to be read.
38    #[configurable(derived)]
39    #[serde(default = "default_topic")]
40    #[derivative(Default(value = "default_topic()"))]
41    pub topic: String,
42
43    #[configurable(derived)]
44    #[serde(default = "default_framing_message_based")]
45    #[derivative(Default(value = "default_framing_message_based()"))]
46    pub framing: FramingConfig,
47
48    #[configurable(derived)]
49    #[serde(default = "default_decoding")]
50    #[derivative(Default(value = "default_decoding()"))]
51    pub decoding: DeserializerConfig,
52
53    /// The namespace to use for logs. This overrides the global setting.
54    #[configurable(metadata(docs::hidden))]
55    #[serde(default)]
56    pub log_namespace: Option<bool>,
57
58    /// Overrides the name of the log field used to add the topic to each event.
59    ///
60    /// The value is the topic from which the MQTT message was published to.
61    ///
62    /// By default, `"topic"` is used.
63    #[serde(default = "default_topic_key")]
64    #[configurable(metadata(docs::examples = "topic"))]
65    pub topic_key: OptionalValuePath,
66}
67
68fn default_topic() -> String {
69    "vector".to_owned()
70}
71
72fn default_topic_key() -> OptionalValuePath {
73    OptionalValuePath::from(owned_value_path!("topic"))
74}
75
76#[async_trait::async_trait]
77#[typetag::serde(name = "mqtt")]
78impl SourceConfig for MqttSourceConfig {
79    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
80        let log_namespace = cx.log_namespace(self.log_namespace);
81
82        let connector = self.build_connector()?;
83
84        let decoder =
85            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
86                .build()?;
87
88        let sink = MqttSource::new(connector.clone(), decoder, log_namespace, self.clone())?;
89        Ok(Box::pin(sink.run(cx.out, cx.shutdown)))
90    }
91
92    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
93        let schema_definition = self
94            .decoding
95            .schema_definition(global_log_namespace.merge(self.log_namespace))
96            .with_standard_vector_source_metadata()
97            .with_source_metadata(
98                Self::NAME,
99                Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
100                &owned_value_path!("timestamp"),
101                Kind::timestamp().or_undefined(),
102                Some("timestamp"),
103            );
104
105        vec![SourceOutput::new_maybe_logs(
106            self.decoding.output_type(),
107            schema_definition,
108        )]
109    }
110
111    fn can_acknowledge(&self) -> bool {
112        false
113    }
114}
115
116impl MqttSourceConfig {
117    fn build_connector(&self) -> Result<MqttConnector, MqttError> {
118        let client_id = self.common.client_id.clone().unwrap_or_else(|| {
119            let hash = rand::rng()
120                .sample_iter(&rand_distr::Alphanumeric)
121                .take(6)
122                .map(char::from)
123                .collect::<String>();
124            format!("vectorSource{hash}")
125        });
126
127        if client_id.is_empty() {
128            return Err(ConfigurationError::InvalidClientId).context(ConfigurationSnafu);
129        }
130
131        let tls =
132            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
133        let mut options = MqttOptions::new(client_id, &self.common.host, self.common.port);
134        options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
135
136        options.set_clean_session(false);
137        match (&self.common.user, &self.common.password) {
138            (Some(user), Some(password)) => {
139                options.set_credentials(user, password);
140            }
141            (None, None) => {
142                // Credentials were not provided
143            }
144            _ => {
145                // We need either both username and password, or neither. MQTT also allows for providing only password, but rumqttc does not allow that so we cannot either.
146                return Err(ConfigurationError::IncompleteCredentials).context(ConfigurationSnafu);
147            }
148        }
149
150        if let Some(tls) = tls.tls() {
151            let ca = tls.authorities_pem().flatten().collect();
152            let client_auth = None;
153            let alpn = Some(vec!["mqtt".into()]);
154            options.set_transport(Transport::Tls(TlsConfiguration::Simple {
155                ca,
156                client_auth,
157                alpn,
158            }));
159        }
160
161        Ok(MqttConnector::new(options))
162    }
163}
164
165impl_generate_config_from_default!(MqttSourceConfig);
166
167#[cfg(test)]
168mod test {
169    use super::*;
170
171    #[test]
172    fn generate_config() {
173        crate::test_util::test_generate_config::<MqttSourceConfig>();
174    }
175}