vector/sources/mqtt/
config.rs1use std::time::Duration;
2
3use rand::Rng;
4use rumqttc::{MqttOptions, TlsConfiguration, Transport};
5use snafu::ResultExt;
6use vector_lib::{
7 codecs::decoding::{DeserializerConfig, FramingConfig},
8 config::{LegacyKey, LogNamespace},
9 configurable::configurable_component,
10 lookup::lookup_v2::OptionalValuePath,
11 lookup::owned_value_path,
12 tls::MaybeTlsSettings,
13};
14use vrl::value::Kind;
15
16use crate::{
17 codecs::DecodingConfig,
18 common::mqtt::{
19 ConfigurationError, ConfigurationSnafu, MqttCommonConfig, MqttConnector, MqttError,
20 TlsSnafu,
21 },
22 config::{SourceConfig, SourceContext, SourceOutput},
23 serde::{default_decoding, default_framing_message_based},
24};
25
26use super::source::MqttSource;
27
28#[configurable_component(source("mqtt", "Collect logs from MQTT."))]
30#[derive(Clone, Debug, Derivative)]
31#[derivative(Default)]
32#[serde(deny_unknown_fields)]
33pub struct MqttSourceConfig {
34 #[serde(flatten)]
35 pub common: MqttCommonConfig,
36
37 #[configurable(derived)]
39 #[serde(default = "default_topic")]
40 #[derivative(Default(value = "default_topic()"))]
41 pub topic: String,
42
43 #[configurable(derived)]
44 #[serde(default = "default_framing_message_based")]
45 #[derivative(Default(value = "default_framing_message_based()"))]
46 pub framing: FramingConfig,
47
48 #[configurable(derived)]
49 #[serde(default = "default_decoding")]
50 #[derivative(Default(value = "default_decoding()"))]
51 pub decoding: DeserializerConfig,
52
53 #[configurable(metadata(docs::hidden))]
55 #[serde(default)]
56 pub log_namespace: Option<bool>,
57
58 #[serde(default = "default_topic_key")]
64 #[configurable(metadata(docs::examples = "topic"))]
65 pub topic_key: OptionalValuePath,
66}
67
68fn default_topic() -> String {
69 "vector".to_owned()
70}
71
72fn default_topic_key() -> OptionalValuePath {
73 OptionalValuePath::from(owned_value_path!("topic"))
74}
75
76#[async_trait::async_trait]
77#[typetag::serde(name = "mqtt")]
78impl SourceConfig for MqttSourceConfig {
79 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
80 let log_namespace = cx.log_namespace(self.log_namespace);
81
82 let connector = self.build_connector()?;
83
84 let decoder =
85 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
86 .build()?;
87
88 let sink = MqttSource::new(connector.clone(), decoder, log_namespace, self.clone())?;
89 Ok(Box::pin(sink.run(cx.out, cx.shutdown)))
90 }
91
92 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
93 let schema_definition = self
94 .decoding
95 .schema_definition(global_log_namespace.merge(self.log_namespace))
96 .with_standard_vector_source_metadata()
97 .with_source_metadata(
98 Self::NAME,
99 Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
100 &owned_value_path!("timestamp"),
101 Kind::timestamp().or_undefined(),
102 Some("timestamp"),
103 );
104
105 vec![SourceOutput::new_maybe_logs(
106 self.decoding.output_type(),
107 schema_definition,
108 )]
109 }
110
111 fn can_acknowledge(&self) -> bool {
112 false
113 }
114}
115
116impl MqttSourceConfig {
117 fn build_connector(&self) -> Result<MqttConnector, MqttError> {
118 let client_id = self.common.client_id.clone().unwrap_or_else(|| {
119 let hash = rand::rng()
120 .sample_iter(&rand_distr::Alphanumeric)
121 .take(6)
122 .map(char::from)
123 .collect::<String>();
124 format!("vectorSource{hash}")
125 });
126
127 if client_id.is_empty() {
128 return Err(ConfigurationError::InvalidClientId).context(ConfigurationSnafu);
129 }
130
131 let tls =
132 MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
133 let mut options = MqttOptions::new(client_id, &self.common.host, self.common.port);
134 options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
135
136 options.set_clean_session(false);
137 match (&self.common.user, &self.common.password) {
138 (Some(user), Some(password)) => {
139 options.set_credentials(user, password);
140 }
141 (None, None) => {
142 }
144 _ => {
145 return Err(ConfigurationError::IncompleteCredentials).context(ConfigurationSnafu);
147 }
148 }
149
150 if let Some(tls) = tls.tls() {
151 let ca = tls.authorities_pem().flatten().collect();
152 let client_auth = None;
153 let alpn = Some(vec!["mqtt".into()]);
154 options.set_transport(Transport::Tls(TlsConfiguration::Simple {
155 ca,
156 client_auth,
157 alpn,
158 }));
159 }
160
161 Ok(MqttConnector::new(options))
162 }
163}
164
165impl_generate_config_from_default!(MqttSourceConfig);
166
167#[cfg(test)]
168mod test {
169 use super::*;
170
171 #[test]
172 fn generate_config() {
173 crate::test_util::test_generate_config::<MqttSourceConfig>();
174 }
175}