vector/sources/mqtt/
config.rs1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::{
7 codecs::decoding::{DeserializerConfig, FramingConfig},
8 config::{LegacyKey, LogNamespace},
9 configurable::configurable_component,
10 lookup::{lookup_v2::OptionalValuePath, owned_value_path},
11 tls::MaybeTlsSettings,
12};
13use vrl::value::Kind;
14
15use super::source::MqttSource;
16use crate::{
17 codecs::DecodingConfig,
18 common::mqtt::{
19 ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
20 TlsSnafu,
21 },
22 config::{SourceConfig, SourceContext, SourceOutput},
23 serde::{default_decoding, default_framing_message_based},
24};
25
26#[configurable_component(source("mqtt", "Collect logs from MQTT."))]
28#[derive(Clone, Debug, Derivative)]
29#[derivative(Default)]
30#[serde(deny_unknown_fields)]
31pub struct MqttSourceConfig {
32 #[serde(flatten)]
33 pub common: MqttCommonConfig,
34
35 #[configurable(derived)]
37 #[serde(default = "default_topic")]
38 #[derivative(Default(value = "default_topic()"))]
39 pub topic: String,
40
41 #[configurable(derived)]
42 #[serde(default = "default_framing_message_based")]
43 #[derivative(Default(value = "default_framing_message_based()"))]
44 pub framing: FramingConfig,
45
46 #[configurable(derived)]
47 #[serde(default = "default_decoding")]
48 #[derivative(Default(value = "default_decoding()"))]
49 pub decoding: DeserializerConfig,
50
51 #[configurable(metadata(docs::hidden))]
53 #[serde(default)]
54 pub log_namespace: Option<bool>,
55
56 #[serde(default = "default_topic_key")]
62 #[configurable(metadata(docs::examples = "topic"))]
63 pub topic_key: OptionalValuePath,
64}
65
66fn default_topic() -> String {
67 "vector".to_owned()
68}
69
70fn default_topic_key() -> OptionalValuePath {
71 OptionalValuePath::from(owned_value_path!("topic"))
72}
73
74#[async_trait::async_trait]
75#[typetag::serde(name = "mqtt")]
76impl SourceConfig for MqttSourceConfig {
77 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
78 let log_namespace = cx.log_namespace(self.log_namespace);
79
80 let connector = self.build_connector()?;
81
82 let decoder =
83 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
84 .build()?;
85
86 let sink = MqttSource::new(connector.clone(), decoder, log_namespace, self.clone())?;
87 Ok(Box::pin(sink.run(cx.out, cx.shutdown)))
88 }
89
90 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
91 let schema_definition = self
92 .decoding
93 .schema_definition(global_log_namespace.merge(self.log_namespace))
94 .with_standard_vector_source_metadata()
95 .with_source_metadata(
96 Self::NAME,
97 Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
98 &owned_value_path!("timestamp"),
99 Kind::timestamp().or_undefined(),
100 Some("timestamp"),
101 );
102
103 vec![SourceOutput::new_maybe_logs(
104 self.decoding.output_type(),
105 schema_definition,
106 )]
107 }
108
109 fn can_acknowledge(&self) -> bool {
110 false
111 }
112}
113
114impl MqttSourceConfig {
115 fn build_connector(&self) -> Result<MqttConnector, MqttError> {
116 let client_id = self.common.client_id.clone().unwrap_or_else(|| {
117 let hash = rand::rng()
118 .sample_iter(&rand_distr::Alphanumeric)
119 .take(6)
120 .map(char::from)
121 .collect::<String>();
122 format!("vectorSource{hash}")
123 });
124
125 if client_id.is_empty() {
126 return Err(ConfigurationError::InvalidClientId).context(ConfigurationSnafu);
127 }
128
129 let tls =
130 MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
131 let mut options = MqttOptions::new(client_id, &self.common.host, self.common.port);
132 options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
133 options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);
134
135 options.set_clean_session(false);
136 match (&self.common.user, &self.common.password) {
137 (Some(user), Some(password)) => {
138 options.set_credentials(user, password);
139 }
140 (None, None) => {
141 }
143 _ => {
144 return Err(ConfigurationError::IncompleteCredentials).context(ConfigurationSnafu);
146 }
147 }
148
149 if let Some(tls) = tls.tls() {
150 let ca = tls.authorities_pem().flatten().collect();
151 let client_auth = None;
152 let alpn = Some(vec!["mqtt".into()]);
153 options.set_transport(Transport::Tls(TlsConfiguration::Simple {
154 ca,
155 client_auth,
156 alpn,
157 }));
158 }
159
160 Ok(MqttConnector::new(options))
161 }
162}
163
164impl_generate_config_from_default!(MqttSourceConfig);
165
166#[cfg(test)]
167mod test {
168 use super::*;
169
170 #[test]
171 fn generate_config() {
172 crate::test_util::test_generate_config::<MqttSourceConfig>();
173 }
174}