vector/sources/
nats.rs

1use chrono::Utc;
2use futures::{pin_mut, StreamExt};
3use snafu::{ResultExt, Snafu};
4use tokio_util::codec::FramedRead;
5use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig, StreamDecodingError};
6use vector_lib::configurable::configurable_component;
7use vector_lib::internal_event::{
8    ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle as _, Protocol,
9};
10use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
11use vector_lib::{
12    config::{LegacyKey, LogNamespace},
13    EstimatedJsonEncodedSizeOf,
14};
15use vrl::value::Kind;
16
17use crate::{
18    codecs::{Decoder, DecodingConfig},
19    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
20    event::Event,
21    internal_events::StreamClosedError,
22    nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError},
23    serde::{default_decoding, default_framing_message_based},
24    shutdown::ShutdownSignal,
25    tls::TlsEnableableConfig,
26    SourceSender,
27};
28
29#[derive(Debug, Snafu)]
30enum BuildError {
31    #[snafu(display("NATS Config Error: {}", source))]
32    Config { source: NatsConfigError },
33    #[snafu(display("NATS Connect Error: {}", source))]
34    Connect { source: async_nats::ConnectError },
35    #[snafu(display("NATS Subscribe Error: {}", source))]
36    Subscribe { source: async_nats::SubscribeError },
37}
38
39/// Configuration for the `nats` source.
40#[configurable_component(source(
41    "nats",
42    "Read observability data from subjects on the NATS messaging system."
43))]
44#[derive(Clone, Debug, Derivative)]
45#[derivative(Default)]
46#[serde(deny_unknown_fields)]
47pub struct NatsSourceConfig {
48    /// The NATS URL to connect to.
49    ///
50    /// The URL takes the form of `nats://server:port`.
51    /// If the port is not specified it defaults to 4222.
52    #[configurable(metadata(docs::examples = "nats://demo.nats.io"))]
53    #[configurable(metadata(docs::examples = "nats://127.0.0.1:4242"))]
54    #[configurable(metadata(
55        docs::examples = "nats://localhost:4222,nats://localhost:5222,nats://localhost:6222"
56    ))]
57    url: String,
58
59    /// A [name][nats_connection_name] assigned to the NATS connection.
60    ///
61    /// [nats_connection_name]: https://docs.nats.io/using-nats/developer/connecting/name
62    #[serde(alias = "name")]
63    #[configurable(metadata(docs::examples = "vector"))]
64    connection_name: String,
65
66    /// The NATS [subject][nats_subject] to pull messages from.
67    ///
68    /// [nats_subject]: https://docs.nats.io/nats-concepts/subjects
69    #[configurable(metadata(docs::examples = "foo"))]
70    #[configurable(metadata(docs::examples = "time.us.east"))]
71    #[configurable(metadata(docs::examples = "time.*.east"))]
72    #[configurable(metadata(docs::examples = "time.>"))]
73    #[configurable(metadata(docs::examples = ">"))]
74    subject: String,
75
76    /// The NATS queue group to join.
77    queue: Option<String>,
78
79    /// The namespace to use for logs. This overrides the global setting.
80    #[configurable(metadata(docs::hidden))]
81    #[serde(default)]
82    pub log_namespace: Option<bool>,
83
84    #[configurable(derived)]
85    tls: Option<TlsEnableableConfig>,
86
87    #[configurable(derived)]
88    auth: Option<NatsAuthConfig>,
89
90    #[configurable(derived)]
91    #[serde(default = "default_framing_message_based")]
92    #[derivative(Default(value = "default_framing_message_based()"))]
93    framing: FramingConfig,
94
95    #[configurable(derived)]
96    #[serde(default = "default_decoding")]
97    #[derivative(Default(value = "default_decoding()"))]
98    decoding: DeserializerConfig,
99
100    /// The `NATS` subject key.
101    #[serde(default = "default_subject_key_field")]
102    subject_key_field: OptionalValuePath,
103
104    /// The buffer capacity of the underlying NATS subscriber.
105    ///
106    /// This value determines how many messages the NATS subscriber buffers
107    /// before incoming messages are dropped.
108    ///
109    /// See the [async_nats documentation][async_nats_subscription_capacity] for more information.
110    ///
111    /// [async_nats_subscription_capacity]: https://docs.rs/async-nats/latest/async_nats/struct.ConnectOptions.html#method.subscription_capacity
112    #[serde(default = "default_subscription_capacity")]
113    #[derivative(Default(value = "default_subscription_capacity()"))]
114    subscriber_capacity: usize,
115}
116
117fn default_subject_key_field() -> OptionalValuePath {
118    OptionalValuePath::from(owned_value_path!("subject"))
119}
120
121const fn default_subscription_capacity() -> usize {
122    65536
123}
124
125impl GenerateConfig for NatsSourceConfig {
126    fn generate_config() -> toml::Value {
127        toml::from_str(
128            r#"
129            connection_name = "vector"
130            subject = "from.vector"
131            url = "nats://127.0.0.1:4222""#,
132        )
133        .unwrap()
134    }
135}
136
137#[async_trait::async_trait]
138#[typetag::serde(name = "nats")]
139impl SourceConfig for NatsSourceConfig {
140    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
141        let log_namespace = cx.log_namespace(self.log_namespace);
142        let (connection, subscription) = create_subscription(self).await?;
143        let decoder =
144            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
145                .build()?;
146
147        Ok(Box::pin(nats_source(
148            self.clone(),
149            connection,
150            subscription,
151            decoder,
152            log_namespace,
153            cx.shutdown,
154            cx.out,
155        )))
156    }
157
158    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
159        let log_namespace = global_log_namespace.merge(self.log_namespace);
160        let legacy_subject_key_field = self
161            .subject_key_field
162            .clone()
163            .path
164            .map(LegacyKey::InsertIfEmpty);
165        let schema_definition = self
166            .decoding
167            .schema_definition(log_namespace)
168            .with_standard_vector_source_metadata()
169            .with_source_metadata(
170                NatsSourceConfig::NAME,
171                legacy_subject_key_field,
172                &owned_value_path!("subject"),
173                Kind::bytes(),
174                None,
175            );
176
177        vec![SourceOutput::new_maybe_logs(
178            self.decoding.output_type(),
179            schema_definition,
180        )]
181    }
182
183    fn can_acknowledge(&self) -> bool {
184        false
185    }
186}
187
188impl NatsSourceConfig {
189    async fn connect(&self) -> Result<async_nats::Client, BuildError> {
190        let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?;
191
192        let server_addrs = self.parse_server_addresses()?;
193        options.connect(server_addrs).await.context(ConnectSnafu)
194    }
195
196    fn parse_server_addresses(&self) -> Result<Vec<async_nats::ServerAddr>, BuildError> {
197        self.url
198            .split(',')
199            .map(|url| {
200                url.parse::<async_nats::ServerAddr>()
201                    .map_err(|_| BuildError::Connect {
202                        source: async_nats::ConnectErrorKind::ServerParse.into(),
203                    })
204            })
205            .collect()
206    }
207}
208
209impl TryFrom<&NatsSourceConfig> for async_nats::ConnectOptions {
210    type Error = NatsConfigError;
211
212    fn try_from(config: &NatsSourceConfig) -> Result<Self, Self::Error> {
213        from_tls_auth_config(&config.connection_name, &config.auth, &config.tls)
214            .map(|options| options.subscription_capacity(config.subscriber_capacity))
215    }
216}
217
218async fn nats_source(
219    config: NatsSourceConfig,
220    // Take ownership of the connection so it doesn't get dropped.
221    _connection: async_nats::Client,
222    subscriber: async_nats::Subscriber,
223    decoder: Decoder,
224    log_namespace: LogNamespace,
225    shutdown: ShutdownSignal,
226    mut out: SourceSender,
227) -> Result<(), ()> {
228    let events_received = register!(EventsReceived);
229    let stream = subscriber.take_until(shutdown);
230    pin_mut!(stream);
231    let bytes_received = register!(BytesReceived::from(Protocol::TCP));
232    while let Some(msg) = stream.next().await {
233        bytes_received.emit(ByteSize(msg.payload.len()));
234        let mut stream = FramedRead::new(msg.payload.as_ref(), decoder.clone());
235        while let Some(next) = stream.next().await {
236            match next {
237                Ok((events, _byte_size)) => {
238                    let count = events.len();
239                    let byte_size = events.estimated_json_encoded_size_of();
240                    events_received.emit(CountByteSize(count, byte_size));
241
242                    let now = Utc::now();
243
244                    let events = events.into_iter().map(|mut event| {
245                        if let Event::Log(ref mut log) = event {
246                            log_namespace.insert_standard_vector_source_metadata(
247                                log,
248                                NatsSourceConfig::NAME,
249                                now,
250                            );
251
252                            let legacy_subject_key_field = config
253                                .subject_key_field
254                                .path
255                                .as_ref()
256                                .map(LegacyKey::InsertIfEmpty);
257                            log_namespace.insert_source_metadata(
258                                NatsSourceConfig::NAME,
259                                log,
260                                legacy_subject_key_field,
261                                &owned_value_path!("subject"),
262                                msg.subject.as_str(),
263                            )
264                        }
265                        event
266                    });
267
268                    out.send_batch(events).await.map_err(|_| {
269                        emit!(StreamClosedError { count });
270                    })?;
271                }
272                Err(error) => {
273                    // Error is logged by `crate::codecs`, no further
274                    // handling is needed here.
275                    if !error.can_continue() {
276                        break;
277                    }
278                }
279            }
280        }
281    }
282    Ok(())
283}
284
285async fn create_subscription(
286    config: &NatsSourceConfig,
287) -> Result<(async_nats::Client, async_nats::Subscriber), BuildError> {
288    let nc = config.connect().await?;
289
290    let subscription = match &config.queue {
291        None => nc.subscribe(config.subject.clone()).await,
292        Some(queue) => {
293            nc.queue_subscribe(config.subject.clone(), queue.clone())
294                .await
295        }
296    };
297
298    let subscription = subscription.context(SubscribeSnafu)?;
299
300    Ok((nc, subscription))
301}
302
303#[cfg(test)]
304mod tests {
305    #![allow(clippy::print_stdout)] //tests
306
307    use vector_lib::lookup::{owned_value_path, OwnedTargetPath};
308    use vector_lib::schema::Definition;
309    use vrl::value::{kind::Collection, Kind};
310
311    use super::*;
312
313    #[test]
314    fn generate_config() {
315        crate::test_util::test_generate_config::<NatsSourceConfig>();
316    }
317
318    #[test]
319    fn output_schema_definition_vector_namespace() {
320        let config = NatsSourceConfig {
321            log_namespace: Some(true),
322            subject_key_field: default_subject_key_field(),
323            ..Default::default()
324        };
325
326        let definitions = config
327            .outputs(LogNamespace::Vector)
328            .remove(0)
329            .schema_definition(true);
330
331        let expected_definition =
332            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
333                .with_meaning(OwnedTargetPath::event_root(), "message")
334                .with_metadata_field(
335                    &owned_value_path!("vector", "source_type"),
336                    Kind::bytes(),
337                    None,
338                )
339                .with_metadata_field(
340                    &owned_value_path!("vector", "ingest_timestamp"),
341                    Kind::timestamp(),
342                    None,
343                )
344                .with_metadata_field(&owned_value_path!("nats", "subject"), Kind::bytes(), None);
345
346        assert_eq!(definitions, Some(expected_definition));
347    }
348
349    #[test]
350    fn output_schema_definition_legacy_namespace() {
351        let config = NatsSourceConfig {
352            subject_key_field: default_subject_key_field(),
353            ..Default::default()
354        };
355        let definitions = config
356            .outputs(LogNamespace::Legacy)
357            .remove(0)
358            .schema_definition(true);
359
360        let expected_definition = Definition::new_with_default_metadata(
361            Kind::object(Collection::empty()),
362            [LogNamespace::Legacy],
363        )
364        .with_event_field(
365            &owned_value_path!("message"),
366            Kind::bytes(),
367            Some("message"),
368        )
369        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
370        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
371        .with_event_field(&owned_value_path!("subject"), Kind::bytes(), None);
372
373        assert_eq!(definitions, Some(expected_definition));
374    }
375}
376
377#[cfg(feature = "nats-integration-tests")]
378#[cfg(test)]
379mod integration_tests {
380    #![allow(clippy::print_stdout)] //tests
381
382    use bytes::Bytes;
383    use vector_lib::config::log_schema;
384
385    use super::*;
386    use crate::nats::{NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword};
387    use crate::test_util::{
388        collect_n,
389        components::{assert_source_compliance, SOURCE_TAGS},
390        random_string,
391    };
392    use crate::tls::TlsConfig;
393
394    async fn publish_and_check(conf: NatsSourceConfig) -> Result<(), BuildError> {
395        let subject = conf.subject.clone();
396        let (nc, sub) = create_subscription(&conf).await?;
397        let nc_pub = nc.clone();
398        let msg = "my message";
399
400        let events = assert_source_compliance(&SOURCE_TAGS, async move {
401            let (tx, rx) = SourceSender::new_test();
402            let decoder = DecodingConfig::new(
403                conf.framing.clone(),
404                conf.decoding.clone(),
405                LogNamespace::Legacy,
406            )
407            .build()
408            .unwrap();
409            tokio::spawn(nats_source(
410                conf.clone(),
411                nc,
412                sub,
413                decoder,
414                LogNamespace::Legacy,
415                ShutdownSignal::noop(),
416                tx,
417            ));
418            nc_pub
419                .publish(subject, Bytes::from_static(msg.as_bytes()))
420                .await
421                .unwrap();
422
423            collect_n(rx, 1).await
424        })
425        .await;
426
427        println!("Received event  {:?}", events[0].as_log());
428        assert_eq!(
429            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
430            msg.into()
431        );
432        Ok(())
433    }
434
435    #[tokio::test]
436    async fn nats_no_auth() {
437        let subject = format!("test-{}", random_string(10));
438        let url =
439            std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222"));
440
441        let conf = NatsSourceConfig {
442            connection_name: "".to_owned(),
443            subject: subject.clone(),
444            url,
445            queue: None,
446            framing: default_framing_message_based(),
447            decoding: default_decoding(),
448            tls: None,
449            auth: None,
450            log_namespace: None,
451            subject_key_field: default_subject_key_field(),
452            ..Default::default()
453        };
454
455        let r = publish_and_check(conf).await;
456        assert!(
457            r.is_ok(),
458            "publish_and_check failed, expected Ok(()), got: {r:?}"
459        );
460    }
461
462    #[tokio::test]
463    async fn nats_userpass_auth_valid() {
464        let subject = format!("test-{}", random_string(10));
465        let url = std::env::var("NATS_USERPASS_ADDRESS")
466            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
467
468        let conf = NatsSourceConfig {
469            connection_name: "".to_owned(),
470            subject: subject.clone(),
471            url,
472            queue: None,
473            framing: default_framing_message_based(),
474            decoding: default_decoding(),
475            tls: None,
476            auth: Some(NatsAuthConfig::UserPassword {
477                user_password: NatsAuthUserPassword {
478                    user: "natsuser".to_string(),
479                    password: "natspass".to_string().into(),
480                },
481            }),
482            log_namespace: None,
483            subject_key_field: default_subject_key_field(),
484            ..Default::default()
485        };
486
487        let r = publish_and_check(conf).await;
488        assert!(
489            r.is_ok(),
490            "publish_and_check failed, expected Ok(()), got: {r:?}"
491        );
492    }
493
494    #[tokio::test]
495    async fn nats_userpass_auth_invalid() {
496        let subject = format!("test-{}", random_string(10));
497        let url = std::env::var("NATS_USERPASS_ADDRESS")
498            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
499
500        let conf = NatsSourceConfig {
501            connection_name: "".to_owned(),
502            subject: subject.clone(),
503            url,
504            queue: None,
505            framing: default_framing_message_based(),
506            decoding: default_decoding(),
507            tls: None,
508            auth: Some(NatsAuthConfig::UserPassword {
509                user_password: NatsAuthUserPassword {
510                    user: "natsuser".to_string(),
511                    password: "wrongpass".to_string().into(),
512                },
513            }),
514            log_namespace: None,
515            subject_key_field: default_subject_key_field(),
516            ..Default::default()
517        };
518
519        let r = publish_and_check(conf).await;
520        assert!(
521            matches!(r, Err(BuildError::Connect { .. })),
522            "publish_and_check failed, expected BuildError::Connect, got: {r:?}"
523        );
524    }
525
526    #[tokio::test]
527    async fn nats_token_auth_valid() {
528        let subject = format!("test-{}", random_string(10));
529        let url = std::env::var("NATS_TOKEN_ADDRESS")
530            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
531
532        let conf = NatsSourceConfig {
533            connection_name: "".to_owned(),
534            subject: subject.clone(),
535            url,
536            queue: None,
537            framing: default_framing_message_based(),
538            decoding: default_decoding(),
539            tls: None,
540            auth: Some(NatsAuthConfig::Token {
541                token: NatsAuthToken {
542                    value: "secret".to_string().into(),
543                },
544            }),
545            log_namespace: None,
546            subject_key_field: default_subject_key_field(),
547            ..Default::default()
548        };
549
550        let r = publish_and_check(conf).await;
551        assert!(
552            r.is_ok(),
553            "publish_and_check failed, expected Ok(()), got: {r:?}"
554        );
555    }
556
557    #[tokio::test]
558    async fn nats_token_auth_invalid() {
559        let subject = format!("test-{}", random_string(10));
560        let url = std::env::var("NATS_TOKEN_ADDRESS")
561            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
562
563        let conf = NatsSourceConfig {
564            connection_name: "".to_owned(),
565            subject: subject.clone(),
566            url,
567            queue: None,
568            framing: default_framing_message_based(),
569            decoding: default_decoding(),
570            tls: None,
571            auth: Some(NatsAuthConfig::Token {
572                token: NatsAuthToken {
573                    value: "wrongsecret".to_string().into(),
574                },
575            }),
576            log_namespace: None,
577            subject_key_field: default_subject_key_field(),
578            ..Default::default()
579        };
580
581        let r = publish_and_check(conf).await;
582        assert!(
583            matches!(r, Err(BuildError::Connect { .. })),
584            "publish_and_check failed, expected BuildError::Connect, got: {r:?}"
585        );
586    }
587
588    #[tokio::test]
589    async fn nats_nkey_auth_valid() {
590        let subject = format!("test-{}", random_string(10));
591        let url = std::env::var("NATS_NKEY_ADDRESS")
592            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
593
594        let conf = NatsSourceConfig {
595            connection_name: "".to_owned(),
596            subject: subject.clone(),
597            url,
598            queue: None,
599            framing: default_framing_message_based(),
600            decoding: default_decoding(),
601            tls: None,
602            auth: Some(NatsAuthConfig::Nkey {
603                nkey: NatsAuthNKey {
604                    nkey: "UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT".into(),
605                    seed: "SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY".into(),
606                },
607            }),
608            log_namespace: None,
609            subject_key_field: default_subject_key_field(),
610            ..Default::default()
611        };
612
613        let r = publish_and_check(conf).await;
614        assert!(
615            r.is_ok(),
616            "publish_and_check failed, expected Ok(()), got: {r:?}"
617        );
618    }
619
620    #[tokio::test]
621    async fn nats_nkey_auth_invalid() {
622        let subject = format!("test-{}", random_string(10));
623        let url = std::env::var("NATS_NKEY_ADDRESS")
624            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
625
626        let conf = NatsSourceConfig {
627            connection_name: "".to_owned(),
628            subject: subject.clone(),
629            url,
630            queue: None,
631            framing: default_framing_message_based(),
632            decoding: default_decoding(),
633            tls: None,
634            auth: Some(NatsAuthConfig::Nkey {
635                nkey: NatsAuthNKey {
636                    nkey: "UAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(),
637                    seed: "SBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB".into(),
638                },
639            }),
640            log_namespace: None,
641            subject_key_field: default_subject_key_field(),
642            ..Default::default()
643        };
644
645        let r = publish_and_check(conf).await;
646        assert!(
647            matches!(r, Err(BuildError::Connect { .. })),
648            "publish_and_check failed, expected BuildError::Config, got: {r:?}"
649        );
650    }
651
652    #[tokio::test]
653    async fn nats_tls_valid() {
654        let subject = format!("test-{}", random_string(10));
655        let url = std::env::var("NATS_TLS_ADDRESS")
656            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
657
658        let conf = NatsSourceConfig {
659            connection_name: "".to_owned(),
660            subject: subject.clone(),
661            url,
662            queue: None,
663            framing: default_framing_message_based(),
664            decoding: default_decoding(),
665            tls: Some(TlsEnableableConfig {
666                enabled: Some(true),
667                options: TlsConfig {
668                    ca_file: Some("tests/data/nats/rootCA.pem".into()),
669                    ..Default::default()
670                },
671            }),
672            auth: None,
673            log_namespace: None,
674            subject_key_field: default_subject_key_field(),
675            ..Default::default()
676        };
677
678        let r = publish_and_check(conf).await;
679        assert!(
680            r.is_ok(),
681            "publish_and_check failed, expected Ok(()), got: {r:?}"
682        );
683    }
684
685    #[tokio::test]
686    async fn nats_tls_invalid() {
687        let subject = format!("test-{}", random_string(10));
688        let url = std::env::var("NATS_TLS_ADDRESS")
689            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
690
691        let conf = NatsSourceConfig {
692            connection_name: "".to_owned(),
693            subject: subject.clone(),
694            url,
695            queue: None,
696            framing: default_framing_message_based(),
697            decoding: default_decoding(),
698            tls: None,
699            auth: None,
700            log_namespace: None,
701            subject_key_field: default_subject_key_field(),
702            ..Default::default()
703        };
704
705        let r = publish_and_check(conf).await;
706        assert!(
707            matches!(r, Err(BuildError::Connect { .. })),
708            "publish_and_check failed, expected BuildError::Connect, got: {r:?}"
709        );
710    }
711
712    #[tokio::test]
713    async fn nats_tls_client_cert_valid() {
714        let subject = format!("test-{}", random_string(10));
715        let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS")
716            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
717
718        let conf = NatsSourceConfig {
719            connection_name: "".to_owned(),
720            subject: subject.clone(),
721            url,
722            queue: None,
723            framing: default_framing_message_based(),
724            decoding: default_decoding(),
725            tls: Some(TlsEnableableConfig {
726                enabled: Some(true),
727                options: TlsConfig {
728                    ca_file: Some("tests/data/nats/rootCA.pem".into()),
729                    crt_file: Some("tests/data/nats/nats-client.pem".into()),
730                    key_file: Some("tests/data/nats/nats-client.key".into()),
731                    ..Default::default()
732                },
733            }),
734            auth: None,
735            log_namespace: None,
736            subject_key_field: default_subject_key_field(),
737            ..Default::default()
738        };
739
740        let r = publish_and_check(conf).await;
741        assert!(
742            r.is_ok(),
743            "publish_and_check failed, expected Ok(()), got: {r:?}"
744        );
745    }
746
747    #[tokio::test]
748    async fn nats_tls_client_cert_invalid() {
749        let subject = format!("test-{}", random_string(10));
750        let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS")
751            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
752
753        let conf = NatsSourceConfig {
754            connection_name: "".to_owned(),
755            subject: subject.clone(),
756            url,
757            queue: None,
758            framing: default_framing_message_based(),
759            decoding: default_decoding(),
760            tls: Some(TlsEnableableConfig {
761                enabled: Some(true),
762                options: TlsConfig {
763                    ca_file: Some("tests/data/nats/rootCA.pem".into()),
764                    ..Default::default()
765                },
766            }),
767            auth: None,
768            log_namespace: None,
769            subject_key_field: default_subject_key_field(),
770            ..Default::default()
771        };
772
773        let r = publish_and_check(conf).await;
774        assert!(
775            matches!(r, Err(BuildError::Connect { .. })),
776            "publish_and_check failed, expected BuildError::Connect, got: {r:?}"
777        );
778    }
779
780    #[tokio::test]
781    async fn nats_tls_jwt_auth_valid() {
782        let subject = format!("test-{}", random_string(10));
783        let url = std::env::var("NATS_JWT_ADDRESS")
784            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
785
786        let conf = NatsSourceConfig {
787            connection_name: "".to_owned(),
788            subject: subject.clone(),
789            url,
790            queue: None,
791            framing: default_framing_message_based(),
792            decoding: default_decoding(),
793            tls: Some(TlsEnableableConfig {
794                enabled: Some(true),
795                options: TlsConfig {
796                    ca_file: Some("tests/data/nats/rootCA.pem".into()),
797                    ..Default::default()
798                },
799            }),
800            auth: Some(NatsAuthConfig::CredentialsFile {
801                credentials_file: NatsAuthCredentialsFile {
802                    path: "tests/data/nats/nats.creds".into(),
803                },
804            }),
805            log_namespace: None,
806            subject_key_field: default_subject_key_field(),
807            ..Default::default()
808        };
809
810        let r = publish_and_check(conf).await;
811        assert!(
812            r.is_ok(),
813            "publish_and_check failed, expected Ok(()), got: {r:?}"
814        );
815    }
816
817    #[tokio::test]
818    async fn nats_tls_jwt_auth_invalid() {
819        let subject = format!("test-{}", random_string(10));
820        let url = std::env::var("NATS_JWT_ADDRESS")
821            .unwrap_or_else(|_| String::from("nats://localhost:4222"));
822
823        let conf = NatsSourceConfig {
824            connection_name: "".to_owned(),
825            subject: subject.clone(),
826            url,
827            queue: None,
828            framing: default_framing_message_based(),
829            decoding: default_decoding(),
830            tls: Some(TlsEnableableConfig {
831                enabled: Some(true),
832                options: TlsConfig {
833                    ca_file: Some("tests/data/nats/rootCA.pem".into()),
834                    ..Default::default()
835                },
836            }),
837            auth: Some(NatsAuthConfig::CredentialsFile {
838                credentials_file: NatsAuthCredentialsFile {
839                    path: "tests/data/nats/nats-bad.creds".into(),
840                },
841            }),
842            log_namespace: None,
843            subject_key_field: default_subject_key_field(),
844            ..Default::default()
845        };
846
847        let r = publish_and_check(conf).await;
848        assert!(
849            matches!(r, Err(BuildError::Config { .. })),
850            "publish_and_check failed, expected BuildError::Config, got: {r:?}"
851        );
852    }
853
854    #[tokio::test]
855    async fn nats_multiple_urls_valid() {
856        let subject = format!("test-{}", random_string(10));
857
858        let conf = NatsSourceConfig {
859            connection_name: "".to_owned(),
860            subject: subject.clone(),
861            url: "nats://nats:4222,nats://demo.nats.io:4222".to_string(),
862            queue: None,
863            framing: default_framing_message_based(),
864            decoding: default_decoding(),
865            tls: None,
866            auth: None,
867            log_namespace: None,
868            subject_key_field: default_subject_key_field(),
869            ..Default::default()
870        };
871
872        let r = publish_and_check(conf).await;
873        assert!(
874            r.is_ok(),
875            "publish_and_check failed for multiple URLs, expected Ok(()), got: {r:?}"
876        );
877    }
878
879    #[tokio::test]
880    async fn nats_multiple_urls_invalid() {
881        let subject = format!("test-{}", random_string(10));
882
883        let conf = NatsSourceConfig {
884            connection_name: "".to_owned(),
885            subject: subject.clone(),
886            url: "http://invalid-url,nats://:invalid@localhost:4222".to_string(),
887            queue: None,
888            framing: default_framing_message_based(),
889            decoding: default_decoding(),
890            tls: None,
891            auth: None,
892            log_namespace: None,
893            subject_key_field: default_subject_key_field(),
894            ..Default::default()
895        };
896
897        let r = publish_and_check(conf).await;
898        assert!(
899            matches!(r, Err(BuildError::Connect { .. })),
900            "publish_and_check failed for bad URLs, expected BuildError::Connect, got: {r:?}"
901        );
902    }
903}