vector/
nats.rs

1//! Shared helper functions for NATS source and sink.
2#![allow(missing_docs)]
3
4use nkeys::error::Error as NKeysError;
5use snafu::{ResultExt, Snafu};
6use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
7
8use crate::tls::TlsEnableableConfig;
9
10/// Errors that can occur during NATS configuration.
11#[derive(Debug, Snafu)]
12pub enum NatsConfigError {
13    #[snafu(display("NATS Auth Config Error: {}", source))]
14    AuthConfigError { source: NKeysError },
15    #[snafu(display("NATS TLS Config Error: missing key"))]
16    TlsMissingKey,
17    #[snafu(display("NATS TLS Config Error: missing cert"))]
18    TlsMissingCert,
19    #[snafu(display("NATS Credentials file error"))]
20    CredentialsFileError { source: std::io::Error },
21}
22
23/// Configuration of the authentication strategy when interacting with NATS.
24#[configurable_component]
25#[derive(Clone, Debug)]
26#[serde(rename_all = "snake_case", tag = "strategy")]
27#[configurable(metadata(
28    docs::enum_tag_description = "The strategy used to authenticate with the NATS server.
29
30More information on NATS authentication, and the various authentication strategies, can be found in the
31NATS [documentation][nats_auth_docs]. For TLS client certificate authentication specifically, see the
32`tls` settings.
33
34[nats_auth_docs]: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro"
35))]
36pub enum NatsAuthConfig {
37    /// Username/password authentication.
38    UserPassword {
39        #[configurable(derived)]
40        user_password: NatsAuthUserPassword,
41    },
42
43    /// Token authentication.
44    Token {
45        #[configurable(derived)]
46        token: NatsAuthToken,
47    },
48
49    /// Credentials file authentication. (JWT-based)
50    CredentialsFile {
51        #[configurable(derived)]
52        credentials_file: NatsAuthCredentialsFile,
53    },
54
55    /// NKey authentication.
56    Nkey {
57        #[configurable(derived)]
58        nkey: NatsAuthNKey,
59    },
60}
61
62impl std::fmt::Display for NatsAuthConfig {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        use NatsAuthConfig::*;
65        let word = match self {
66            UserPassword { .. } => "user_password",
67            Token { .. } => "token",
68            CredentialsFile { .. } => "credentials_file",
69            Nkey { .. } => "nkey",
70        };
71        write!(f, "{word}")
72    }
73}
74
75/// Username and password configuration.
76#[configurable_component]
77#[derive(Clone, Debug)]
78#[serde(deny_unknown_fields)]
79pub struct NatsAuthUserPassword {
80    /// Username.
81    pub(crate) user: String,
82
83    /// Password.
84    pub(crate) password: SensitiveString,
85}
86
87/// Token configuration.
88#[configurable_component]
89#[derive(Clone, Debug)]
90#[serde(deny_unknown_fields)]
91pub struct NatsAuthToken {
92    /// Token.
93    pub(crate) value: SensitiveString,
94}
95
96/// Credentials file configuration.
97#[configurable_component]
98#[derive(Clone, Debug)]
99#[serde(deny_unknown_fields)]
100pub struct NatsAuthCredentialsFile {
101    /// Path to credentials file.
102    #[configurable(metadata(docs::examples = "/etc/nats/nats.creds"))]
103    pub(crate) path: String,
104}
105
106/// NKeys configuration.
107#[configurable_component]
108#[derive(Clone, Debug)]
109#[serde(deny_unknown_fields)]
110pub struct NatsAuthNKey {
111    /// User.
112    ///
113    /// Conceptually, this is equivalent to a public key.
114    pub(crate) nkey: String,
115
116    /// Seed.
117    ///
118    /// Conceptually, this is equivalent to a private key.
119    pub(crate) seed: String,
120}
121
122impl NatsAuthConfig {
123    pub(crate) fn to_nats_options(&self) -> Result<async_nats::ConnectOptions, NatsConfigError> {
124        match self {
125            NatsAuthConfig::UserPassword { user_password } => {
126                Ok(async_nats::ConnectOptions::with_user_and_password(
127                    user_password.user.clone(),
128                    user_password.password.inner().to_string(),
129                ))
130            }
131            NatsAuthConfig::CredentialsFile { credentials_file } => {
132                async_nats::ConnectOptions::with_credentials(
133                    &std::fs::read_to_string(credentials_file.path.clone())
134                        .context(CredentialsFileSnafu)?,
135                )
136                .context(CredentialsFileSnafu)
137            }
138            NatsAuthConfig::Nkey { nkey } => {
139                Ok(async_nats::ConnectOptions::with_nkey(nkey.seed.clone()))
140            }
141            NatsAuthConfig::Token { token } => Ok(async_nats::ConnectOptions::with_token(
142                token.value.inner().to_string(),
143            )),
144        }
145    }
146}
147
148pub(crate) fn from_tls_auth_config(
149    connection_name: &str,
150    auth_config: &Option<NatsAuthConfig>,
151    tls_config: &Option<TlsEnableableConfig>,
152) -> Result<async_nats::ConnectOptions, NatsConfigError> {
153    let nats_options = match &auth_config {
154        None => async_nats::ConnectOptions::new(),
155        Some(auth) => auth.to_nats_options()?,
156    };
157
158    let nats_options = nats_options.name(connection_name);
159
160    match tls_config {
161        None => Ok(nats_options),
162        Some(tls_config) => {
163            let tls_enabled = tls_config.enabled.unwrap_or(false);
164            let nats_options = nats_options.require_tls(tls_enabled);
165            if !tls_enabled {
166                return Ok(nats_options);
167            }
168
169            let nats_options = match &tls_config.options.ca_file {
170                None => nats_options,
171                Some(ca_file) => nats_options.add_root_certificates(ca_file.clone()),
172            };
173
174            let nats_options = match (&tls_config.options.crt_file, &tls_config.options.key_file) {
175                (None, None) => nats_options,
176                (Some(crt_file), Some(key_file)) => {
177                    nats_options.add_client_certificate(crt_file.clone(), key_file.clone())
178                }
179                (Some(_crt_file), None) => return Err(NatsConfigError::TlsMissingKey),
180                (None, Some(_key_file)) => return Err(NatsConfigError::TlsMissingCert),
181            };
182            Ok(nats_options)
183        }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    fn parse_auth(s: &str) -> Result<async_nats::ConnectOptions, crate::Error> {
192        toml::from_str(s)
193            .map_err(Into::into)
194            .and_then(|config: NatsAuthConfig| config.to_nats_options().map_err(Into::into))
195    }
196
197    #[test]
198    fn auth_user_password_ok() {
199        parse_auth(
200            r#"
201            strategy = "user_password"
202            user_password.user = "username"
203            user_password.password = "password"
204        "#,
205        )
206        .unwrap();
207    }
208
209    #[test]
210    fn auth_user_password_missing_user() {
211        parse_auth(
212            r#"
213            strategy = "user_password"
214            user_password.password = "password"
215        "#,
216        )
217        .unwrap_err();
218    }
219
220    #[test]
221    fn auth_user_password_missing_password() {
222        parse_auth(
223            r#"
224            strategy = "user_password"
225            user_password.user = "username"
226        "#,
227        )
228        .unwrap_err();
229    }
230
231    #[test]
232    fn auth_user_password_missing_all() {
233        parse_auth(
234            r#"
235            strategy = "user_password"
236            token.value = "foobar"
237            "#,
238        )
239        .unwrap_err();
240    }
241
242    #[test]
243    fn auth_token_ok() {
244        parse_auth(
245            r#"
246            strategy = "token"
247            token.value = "token"
248        "#,
249        )
250        .unwrap();
251    }
252
253    #[test]
254    fn auth_token_missing() {
255        parse_auth(
256            r#"
257            strategy = "token"
258            user_password.user = "foobar"
259            "#,
260        )
261        .unwrap_err();
262    }
263
264    #[test]
265    fn auth_credentials_file_ok() {
266        parse_auth(
267            r#"
268            strategy = "credentials_file"
269            credentials_file.path = "tests/data/nats/nats.creds"
270        "#,
271        )
272        .unwrap();
273    }
274
275    #[test]
276    fn auth_credentials_file_missing() {
277        parse_auth(
278            r#"
279            strategy = "credentials_file"
280            token.value = "foobar"
281            "#,
282        )
283        .unwrap_err();
284    }
285
286    #[test]
287    fn auth_nkey_ok() {
288        parse_auth(
289            r#"
290            strategy = "nkey"
291            nkey.nkey = "UC435ZYS52HF72E2VMQF4GO6CUJOCHDUUPEBU7XDXW5AQLIC6JZ46PO5"
292            nkey.seed = "SUAAEZYNLTEA2MDTG7L5X7QODZXYHPOI2LT2KH5I4GD6YVP24SE766EGPA"
293        "#,
294        )
295        .unwrap();
296    }
297
298    #[test]
299    fn auth_nkey_missing_nkey() {
300        parse_auth(
301            r#"
302            strategy = "nkey"
303            nkey.seed = "SUAAEZYNLTEA2MDTG7L5X7QODZXYHPOI2LT2KH5I4GD6YVP24SE766EGPA"
304        "#,
305        )
306        .unwrap_err();
307    }
308
309    #[test]
310    fn auth_nkey_missing_seed() {
311        parse_auth(
312            r#"
313            strategy = "nkey"
314            nkey.nkey = "UC435ZYS52HF72E2VMQF4GO6CUJOCHDUUPEBU7XDXW5AQLIC6JZ46PO5"
315        "#,
316        )
317        .unwrap_err();
318    }
319
320    #[test]
321    fn auth_nkey_missing_both() {
322        parse_auth(
323            r#"
324            strategy = "nkey"
325            user_password.user = "foobar"
326            "#,
327        )
328        .unwrap_err();
329    }
330}