1use chrono::Utc;
2use futures::{pin_mut, StreamExt};
3use snafu::{ResultExt, Snafu};
4use tokio_util::codec::FramedRead;
5use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig, StreamDecodingError};
6use vector_lib::configurable::configurable_component;
7use vector_lib::internal_event::{
8 ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle as _, Protocol,
9};
10use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
11use vector_lib::{
12 config::{LegacyKey, LogNamespace},
13 EstimatedJsonEncodedSizeOf,
14};
15use vrl::value::Kind;
16
17use crate::{
18 codecs::{Decoder, DecodingConfig},
19 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
20 event::Event,
21 internal_events::StreamClosedError,
22 nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError},
23 serde::{default_decoding, default_framing_message_based},
24 shutdown::ShutdownSignal,
25 tls::TlsEnableableConfig,
26 SourceSender,
27};
28
29#[derive(Debug, Snafu)]
30enum BuildError {
31 #[snafu(display("NATS Config Error: {}", source))]
32 Config { source: NatsConfigError },
33 #[snafu(display("NATS Connect Error: {}", source))]
34 Connect { source: async_nats::ConnectError },
35 #[snafu(display("NATS Subscribe Error: {}", source))]
36 Subscribe { source: async_nats::SubscribeError },
37}
38
39#[configurable_component(source(
41 "nats",
42 "Read observability data from subjects on the NATS messaging system."
43))]
44#[derive(Clone, Debug, Derivative)]
45#[derivative(Default)]
46#[serde(deny_unknown_fields)]
47pub struct NatsSourceConfig {
48 #[configurable(metadata(docs::examples = "nats://demo.nats.io"))]
53 #[configurable(metadata(docs::examples = "nats://127.0.0.1:4242"))]
54 #[configurable(metadata(
55 docs::examples = "nats://localhost:4222,nats://localhost:5222,nats://localhost:6222"
56 ))]
57 url: String,
58
59 #[serde(alias = "name")]
63 #[configurable(metadata(docs::examples = "vector"))]
64 connection_name: String,
65
66 #[configurable(metadata(docs::examples = "foo"))]
70 #[configurable(metadata(docs::examples = "time.us.east"))]
71 #[configurable(metadata(docs::examples = "time.*.east"))]
72 #[configurable(metadata(docs::examples = "time.>"))]
73 #[configurable(metadata(docs::examples = ">"))]
74 subject: String,
75
76 queue: Option<String>,
78
79 #[configurable(metadata(docs::hidden))]
81 #[serde(default)]
82 pub log_namespace: Option<bool>,
83
84 #[configurable(derived)]
85 tls: Option<TlsEnableableConfig>,
86
87 #[configurable(derived)]
88 auth: Option<NatsAuthConfig>,
89
90 #[configurable(derived)]
91 #[serde(default = "default_framing_message_based")]
92 #[derivative(Default(value = "default_framing_message_based()"))]
93 framing: FramingConfig,
94
95 #[configurable(derived)]
96 #[serde(default = "default_decoding")]
97 #[derivative(Default(value = "default_decoding()"))]
98 decoding: DeserializerConfig,
99
100 #[serde(default = "default_subject_key_field")]
102 subject_key_field: OptionalValuePath,
103
104 #[serde(default = "default_subscription_capacity")]
113 #[derivative(Default(value = "default_subscription_capacity()"))]
114 subscriber_capacity: usize,
115}
116
117fn default_subject_key_field() -> OptionalValuePath {
118 OptionalValuePath::from(owned_value_path!("subject"))
119}
120
121const fn default_subscription_capacity() -> usize {
122 65536
123}
124
125impl GenerateConfig for NatsSourceConfig {
126 fn generate_config() -> toml::Value {
127 toml::from_str(
128 r#"
129 connection_name = "vector"
130 subject = "from.vector"
131 url = "nats://127.0.0.1:4222""#,
132 )
133 .unwrap()
134 }
135}
136
137#[async_trait::async_trait]
138#[typetag::serde(name = "nats")]
139impl SourceConfig for NatsSourceConfig {
140 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
141 let log_namespace = cx.log_namespace(self.log_namespace);
142 let (connection, subscription) = create_subscription(self).await?;
143 let decoder =
144 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
145 .build()?;
146
147 Ok(Box::pin(nats_source(
148 self.clone(),
149 connection,
150 subscription,
151 decoder,
152 log_namespace,
153 cx.shutdown,
154 cx.out,
155 )))
156 }
157
158 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
159 let log_namespace = global_log_namespace.merge(self.log_namespace);
160 let legacy_subject_key_field = self
161 .subject_key_field
162 .clone()
163 .path
164 .map(LegacyKey::InsertIfEmpty);
165 let schema_definition = self
166 .decoding
167 .schema_definition(log_namespace)
168 .with_standard_vector_source_metadata()
169 .with_source_metadata(
170 NatsSourceConfig::NAME,
171 legacy_subject_key_field,
172 &owned_value_path!("subject"),
173 Kind::bytes(),
174 None,
175 );
176
177 vec![SourceOutput::new_maybe_logs(
178 self.decoding.output_type(),
179 schema_definition,
180 )]
181 }
182
183 fn can_acknowledge(&self) -> bool {
184 false
185 }
186}
187
188impl NatsSourceConfig {
189 async fn connect(&self) -> Result<async_nats::Client, BuildError> {
190 let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?;
191
192 let server_addrs = self.parse_server_addresses()?;
193 options.connect(server_addrs).await.context(ConnectSnafu)
194 }
195
196 fn parse_server_addresses(&self) -> Result<Vec<async_nats::ServerAddr>, BuildError> {
197 self.url
198 .split(',')
199 .map(|url| {
200 url.parse::<async_nats::ServerAddr>()
201 .map_err(|_| BuildError::Connect {
202 source: async_nats::ConnectErrorKind::ServerParse.into(),
203 })
204 })
205 .collect()
206 }
207}
208
209impl TryFrom<&NatsSourceConfig> for async_nats::ConnectOptions {
210 type Error = NatsConfigError;
211
212 fn try_from(config: &NatsSourceConfig) -> Result<Self, Self::Error> {
213 from_tls_auth_config(&config.connection_name, &config.auth, &config.tls)
214 .map(|options| options.subscription_capacity(config.subscriber_capacity))
215 }
216}
217
218async fn nats_source(
219 config: NatsSourceConfig,
220 _connection: async_nats::Client,
222 subscriber: async_nats::Subscriber,
223 decoder: Decoder,
224 log_namespace: LogNamespace,
225 shutdown: ShutdownSignal,
226 mut out: SourceSender,
227) -> Result<(), ()> {
228 let events_received = register!(EventsReceived);
229 let stream = subscriber.take_until(shutdown);
230 pin_mut!(stream);
231 let bytes_received = register!(BytesReceived::from(Protocol::TCP));
232 while let Some(msg) = stream.next().await {
233 bytes_received.emit(ByteSize(msg.payload.len()));
234 let mut stream = FramedRead::new(msg.payload.as_ref(), decoder.clone());
235 while let Some(next) = stream.next().await {
236 match next {
237 Ok((events, _byte_size)) => {
238 let count = events.len();
239 let byte_size = events.estimated_json_encoded_size_of();
240 events_received.emit(CountByteSize(count, byte_size));
241
242 let now = Utc::now();
243
244 let events = events.into_iter().map(|mut event| {
245 if let Event::Log(ref mut log) = event {
246 log_namespace.insert_standard_vector_source_metadata(
247 log,
248 NatsSourceConfig::NAME,
249 now,
250 );
251
252 let legacy_subject_key_field = config
253 .subject_key_field
254 .path
255 .as_ref()
256 .map(LegacyKey::InsertIfEmpty);
257 log_namespace.insert_source_metadata(
258 NatsSourceConfig::NAME,
259 log,
260 legacy_subject_key_field,
261 &owned_value_path!("subject"),
262 msg.subject.as_str(),
263 )
264 }
265 event
266 });
267
268 out.send_batch(events).await.map_err(|_| {
269 emit!(StreamClosedError { count });
270 })?;
271 }
272 Err(error) => {
273 if !error.can_continue() {
276 break;
277 }
278 }
279 }
280 }
281 }
282 Ok(())
283}
284
285async fn create_subscription(
286 config: &NatsSourceConfig,
287) -> Result<(async_nats::Client, async_nats::Subscriber), BuildError> {
288 let nc = config.connect().await?;
289
290 let subscription = match &config.queue {
291 None => nc.subscribe(config.subject.clone()).await,
292 Some(queue) => {
293 nc.queue_subscribe(config.subject.clone(), queue.clone())
294 .await
295 }
296 };
297
298 let subscription = subscription.context(SubscribeSnafu)?;
299
300 Ok((nc, subscription))
301}
302
303#[cfg(test)]
304mod tests {
305 #![allow(clippy::print_stdout)] use vector_lib::lookup::{owned_value_path, OwnedTargetPath};
308 use vector_lib::schema::Definition;
309 use vrl::value::{kind::Collection, Kind};
310
311 use super::*;
312
313 #[test]
314 fn generate_config() {
315 crate::test_util::test_generate_config::<NatsSourceConfig>();
316 }
317
318 #[test]
319 fn output_schema_definition_vector_namespace() {
320 let config = NatsSourceConfig {
321 log_namespace: Some(true),
322 subject_key_field: default_subject_key_field(),
323 ..Default::default()
324 };
325
326 let definitions = config
327 .outputs(LogNamespace::Vector)
328 .remove(0)
329 .schema_definition(true);
330
331 let expected_definition =
332 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
333 .with_meaning(OwnedTargetPath::event_root(), "message")
334 .with_metadata_field(
335 &owned_value_path!("vector", "source_type"),
336 Kind::bytes(),
337 None,
338 )
339 .with_metadata_field(
340 &owned_value_path!("vector", "ingest_timestamp"),
341 Kind::timestamp(),
342 None,
343 )
344 .with_metadata_field(&owned_value_path!("nats", "subject"), Kind::bytes(), None);
345
346 assert_eq!(definitions, Some(expected_definition));
347 }
348
349 #[test]
350 fn output_schema_definition_legacy_namespace() {
351 let config = NatsSourceConfig {
352 subject_key_field: default_subject_key_field(),
353 ..Default::default()
354 };
355 let definitions = config
356 .outputs(LogNamespace::Legacy)
357 .remove(0)
358 .schema_definition(true);
359
360 let expected_definition = Definition::new_with_default_metadata(
361 Kind::object(Collection::empty()),
362 [LogNamespace::Legacy],
363 )
364 .with_event_field(
365 &owned_value_path!("message"),
366 Kind::bytes(),
367 Some("message"),
368 )
369 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
370 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
371 .with_event_field(&owned_value_path!("subject"), Kind::bytes(), None);
372
373 assert_eq!(definitions, Some(expected_definition));
374 }
375}
376
377#[cfg(feature = "nats-integration-tests")]
378#[cfg(test)]
379mod integration_tests {
380 #![allow(clippy::print_stdout)] use bytes::Bytes;
383 use vector_lib::config::log_schema;
384
385 use super::*;
386 use crate::nats::{NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword};
387 use crate::test_util::{
388 collect_n,
389 components::{assert_source_compliance, SOURCE_TAGS},
390 random_string,
391 };
392 use crate::tls::TlsConfig;
393
394 async fn publish_and_check(conf: NatsSourceConfig) -> Result<(), BuildError> {
395 let subject = conf.subject.clone();
396 let (nc, sub) = create_subscription(&conf).await?;
397 let nc_pub = nc.clone();
398 let msg = "my message";
399
400 let events = assert_source_compliance(&SOURCE_TAGS, async move {
401 let (tx, rx) = SourceSender::new_test();
402 let decoder = DecodingConfig::new(
403 conf.framing.clone(),
404 conf.decoding.clone(),
405 LogNamespace::Legacy,
406 )
407 .build()
408 .unwrap();
409 tokio::spawn(nats_source(
410 conf.clone(),
411 nc,
412 sub,
413 decoder,
414 LogNamespace::Legacy,
415 ShutdownSignal::noop(),
416 tx,
417 ));
418 nc_pub
419 .publish(subject, Bytes::from_static(msg.as_bytes()))
420 .await
421 .unwrap();
422
423 collect_n(rx, 1).await
424 })
425 .await;
426
427 println!("Received event {:?}", events[0].as_log());
428 assert_eq!(
429 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
430 msg.into()
431 );
432 Ok(())
433 }
434
435 #[tokio::test]
436 async fn nats_no_auth() {
437 let subject = format!("test-{}", random_string(10));
438 let url =
439 std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222"));
440
441 let conf = NatsSourceConfig {
442 connection_name: "".to_owned(),
443 subject: subject.clone(),
444 url,
445 queue: None,
446 framing: default_framing_message_based(),
447 decoding: default_decoding(),
448 tls: None,
449 auth: None,
450 log_namespace: None,
451 subject_key_field: default_subject_key_field(),
452 ..Default::default()
453 };
454
455 let r = publish_and_check(conf).await;
456 assert!(
457 r.is_ok(),
458 "publish_and_check failed, expected Ok(()), got: {r:?}"
459 );
460 }
461
462 #[tokio::test]
463 async fn nats_userpass_auth_valid() {
464 let subject = format!("test-{}", random_string(10));
465 let url = std::env::var("NATS_USERPASS_ADDRESS")
466 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
467
468 let conf = NatsSourceConfig {
469 connection_name: "".to_owned(),
470 subject: subject.clone(),
471 url,
472 queue: None,
473 framing: default_framing_message_based(),
474 decoding: default_decoding(),
475 tls: None,
476 auth: Some(NatsAuthConfig::UserPassword {
477 user_password: NatsAuthUserPassword {
478 user: "natsuser".to_string(),
479 password: "natspass".to_string().into(),
480 },
481 }),
482 log_namespace: None,
483 subject_key_field: default_subject_key_field(),
484 ..Default::default()
485 };
486
487 let r = publish_and_check(conf).await;
488 assert!(
489 r.is_ok(),
490 "publish_and_check failed, expected Ok(()), got: {r:?}"
491 );
492 }
493
494 #[tokio::test]
495 async fn nats_userpass_auth_invalid() {
496 let subject = format!("test-{}", random_string(10));
497 let url = std::env::var("NATS_USERPASS_ADDRESS")
498 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
499
500 let conf = NatsSourceConfig {
501 connection_name: "".to_owned(),
502 subject: subject.clone(),
503 url,
504 queue: None,
505 framing: default_framing_message_based(),
506 decoding: default_decoding(),
507 tls: None,
508 auth: Some(NatsAuthConfig::UserPassword {
509 user_password: NatsAuthUserPassword {
510 user: "natsuser".to_string(),
511 password: "wrongpass".to_string().into(),
512 },
513 }),
514 log_namespace: None,
515 subject_key_field: default_subject_key_field(),
516 ..Default::default()
517 };
518
519 let r = publish_and_check(conf).await;
520 assert!(
521 matches!(r, Err(BuildError::Connect { .. })),
522 "publish_and_check failed, expected BuildError::Connect, got: {r:?}"
523 );
524 }
525
526 #[tokio::test]
527 async fn nats_token_auth_valid() {
528 let subject = format!("test-{}", random_string(10));
529 let url = std::env::var("NATS_TOKEN_ADDRESS")
530 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
531
532 let conf = NatsSourceConfig {
533 connection_name: "".to_owned(),
534 subject: subject.clone(),
535 url,
536 queue: None,
537 framing: default_framing_message_based(),
538 decoding: default_decoding(),
539 tls: None,
540 auth: Some(NatsAuthConfig::Token {
541 token: NatsAuthToken {
542 value: "secret".to_string().into(),
543 },
544 }),
545 log_namespace: None,
546 subject_key_field: default_subject_key_field(),
547 ..Default::default()
548 };
549
550 let r = publish_and_check(conf).await;
551 assert!(
552 r.is_ok(),
553 "publish_and_check failed, expected Ok(()), got: {r:?}"
554 );
555 }
556
557 #[tokio::test]
558 async fn nats_token_auth_invalid() {
559 let subject = format!("test-{}", random_string(10));
560 let url = std::env::var("NATS_TOKEN_ADDRESS")
561 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
562
563 let conf = NatsSourceConfig {
564 connection_name: "".to_owned(),
565 subject: subject.clone(),
566 url,
567 queue: None,
568 framing: default_framing_message_based(),
569 decoding: default_decoding(),
570 tls: None,
571 auth: Some(NatsAuthConfig::Token {
572 token: NatsAuthToken {
573 value: "wrongsecret".to_string().into(),
574 },
575 }),
576 log_namespace: None,
577 subject_key_field: default_subject_key_field(),
578 ..Default::default()
579 };
580
581 let r = publish_and_check(conf).await;
582 assert!(
583 matches!(r, Err(BuildError::Connect { .. })),
584 "publish_and_check failed, expected BuildError::Connect, got: {r:?}"
585 );
586 }
587
588 #[tokio::test]
589 async fn nats_nkey_auth_valid() {
590 let subject = format!("test-{}", random_string(10));
591 let url = std::env::var("NATS_NKEY_ADDRESS")
592 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
593
594 let conf = NatsSourceConfig {
595 connection_name: "".to_owned(),
596 subject: subject.clone(),
597 url,
598 queue: None,
599 framing: default_framing_message_based(),
600 decoding: default_decoding(),
601 tls: None,
602 auth: Some(NatsAuthConfig::Nkey {
603 nkey: NatsAuthNKey {
604 nkey: "UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT".into(),
605 seed: "SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY".into(),
606 },
607 }),
608 log_namespace: None,
609 subject_key_field: default_subject_key_field(),
610 ..Default::default()
611 };
612
613 let r = publish_and_check(conf).await;
614 assert!(
615 r.is_ok(),
616 "publish_and_check failed, expected Ok(()), got: {r:?}"
617 );
618 }
619
620 #[tokio::test]
621 async fn nats_nkey_auth_invalid() {
622 let subject = format!("test-{}", random_string(10));
623 let url = std::env::var("NATS_NKEY_ADDRESS")
624 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
625
626 let conf = NatsSourceConfig {
627 connection_name: "".to_owned(),
628 subject: subject.clone(),
629 url,
630 queue: None,
631 framing: default_framing_message_based(),
632 decoding: default_decoding(),
633 tls: None,
634 auth: Some(NatsAuthConfig::Nkey {
635 nkey: NatsAuthNKey {
636 nkey: "UAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(),
637 seed: "SBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB".into(),
638 },
639 }),
640 log_namespace: None,
641 subject_key_field: default_subject_key_field(),
642 ..Default::default()
643 };
644
645 let r = publish_and_check(conf).await;
646 assert!(
647 matches!(r, Err(BuildError::Connect { .. })),
648 "publish_and_check failed, expected BuildError::Config, got: {r:?}"
649 );
650 }
651
652 #[tokio::test]
653 async fn nats_tls_valid() {
654 let subject = format!("test-{}", random_string(10));
655 let url = std::env::var("NATS_TLS_ADDRESS")
656 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
657
658 let conf = NatsSourceConfig {
659 connection_name: "".to_owned(),
660 subject: subject.clone(),
661 url,
662 queue: None,
663 framing: default_framing_message_based(),
664 decoding: default_decoding(),
665 tls: Some(TlsEnableableConfig {
666 enabled: Some(true),
667 options: TlsConfig {
668 ca_file: Some("tests/data/nats/rootCA.pem".into()),
669 ..Default::default()
670 },
671 }),
672 auth: None,
673 log_namespace: None,
674 subject_key_field: default_subject_key_field(),
675 ..Default::default()
676 };
677
678 let r = publish_and_check(conf).await;
679 assert!(
680 r.is_ok(),
681 "publish_and_check failed, expected Ok(()), got: {r:?}"
682 );
683 }
684
685 #[tokio::test]
686 async fn nats_tls_invalid() {
687 let subject = format!("test-{}", random_string(10));
688 let url = std::env::var("NATS_TLS_ADDRESS")
689 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
690
691 let conf = NatsSourceConfig {
692 connection_name: "".to_owned(),
693 subject: subject.clone(),
694 url,
695 queue: None,
696 framing: default_framing_message_based(),
697 decoding: default_decoding(),
698 tls: None,
699 auth: None,
700 log_namespace: None,
701 subject_key_field: default_subject_key_field(),
702 ..Default::default()
703 };
704
705 let r = publish_and_check(conf).await;
706 assert!(
707 matches!(r, Err(BuildError::Connect { .. })),
708 "publish_and_check failed, expected BuildError::Connect, got: {r:?}"
709 );
710 }
711
712 #[tokio::test]
713 async fn nats_tls_client_cert_valid() {
714 let subject = format!("test-{}", random_string(10));
715 let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS")
716 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
717
718 let conf = NatsSourceConfig {
719 connection_name: "".to_owned(),
720 subject: subject.clone(),
721 url,
722 queue: None,
723 framing: default_framing_message_based(),
724 decoding: default_decoding(),
725 tls: Some(TlsEnableableConfig {
726 enabled: Some(true),
727 options: TlsConfig {
728 ca_file: Some("tests/data/nats/rootCA.pem".into()),
729 crt_file: Some("tests/data/nats/nats-client.pem".into()),
730 key_file: Some("tests/data/nats/nats-client.key".into()),
731 ..Default::default()
732 },
733 }),
734 auth: None,
735 log_namespace: None,
736 subject_key_field: default_subject_key_field(),
737 ..Default::default()
738 };
739
740 let r = publish_and_check(conf).await;
741 assert!(
742 r.is_ok(),
743 "publish_and_check failed, expected Ok(()), got: {r:?}"
744 );
745 }
746
747 #[tokio::test]
748 async fn nats_tls_client_cert_invalid() {
749 let subject = format!("test-{}", random_string(10));
750 let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS")
751 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
752
753 let conf = NatsSourceConfig {
754 connection_name: "".to_owned(),
755 subject: subject.clone(),
756 url,
757 queue: None,
758 framing: default_framing_message_based(),
759 decoding: default_decoding(),
760 tls: Some(TlsEnableableConfig {
761 enabled: Some(true),
762 options: TlsConfig {
763 ca_file: Some("tests/data/nats/rootCA.pem".into()),
764 ..Default::default()
765 },
766 }),
767 auth: None,
768 log_namespace: None,
769 subject_key_field: default_subject_key_field(),
770 ..Default::default()
771 };
772
773 let r = publish_and_check(conf).await;
774 assert!(
775 matches!(r, Err(BuildError::Connect { .. })),
776 "publish_and_check failed, expected BuildError::Connect, got: {r:?}"
777 );
778 }
779
780 #[tokio::test]
781 async fn nats_tls_jwt_auth_valid() {
782 let subject = format!("test-{}", random_string(10));
783 let url = std::env::var("NATS_JWT_ADDRESS")
784 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
785
786 let conf = NatsSourceConfig {
787 connection_name: "".to_owned(),
788 subject: subject.clone(),
789 url,
790 queue: None,
791 framing: default_framing_message_based(),
792 decoding: default_decoding(),
793 tls: Some(TlsEnableableConfig {
794 enabled: Some(true),
795 options: TlsConfig {
796 ca_file: Some("tests/data/nats/rootCA.pem".into()),
797 ..Default::default()
798 },
799 }),
800 auth: Some(NatsAuthConfig::CredentialsFile {
801 credentials_file: NatsAuthCredentialsFile {
802 path: "tests/data/nats/nats.creds".into(),
803 },
804 }),
805 log_namespace: None,
806 subject_key_field: default_subject_key_field(),
807 ..Default::default()
808 };
809
810 let r = publish_and_check(conf).await;
811 assert!(
812 r.is_ok(),
813 "publish_and_check failed, expected Ok(()), got: {r:?}"
814 );
815 }
816
817 #[tokio::test]
818 async fn nats_tls_jwt_auth_invalid() {
819 let subject = format!("test-{}", random_string(10));
820 let url = std::env::var("NATS_JWT_ADDRESS")
821 .unwrap_or_else(|_| String::from("nats://localhost:4222"));
822
823 let conf = NatsSourceConfig {
824 connection_name: "".to_owned(),
825 subject: subject.clone(),
826 url,
827 queue: None,
828 framing: default_framing_message_based(),
829 decoding: default_decoding(),
830 tls: Some(TlsEnableableConfig {
831 enabled: Some(true),
832 options: TlsConfig {
833 ca_file: Some("tests/data/nats/rootCA.pem".into()),
834 ..Default::default()
835 },
836 }),
837 auth: Some(NatsAuthConfig::CredentialsFile {
838 credentials_file: NatsAuthCredentialsFile {
839 path: "tests/data/nats/nats-bad.creds".into(),
840 },
841 }),
842 log_namespace: None,
843 subject_key_field: default_subject_key_field(),
844 ..Default::default()
845 };
846
847 let r = publish_and_check(conf).await;
848 assert!(
849 matches!(r, Err(BuildError::Config { .. })),
850 "publish_and_check failed, expected BuildError::Config, got: {r:?}"
851 );
852 }
853
854 #[tokio::test]
855 async fn nats_multiple_urls_valid() {
856 let subject = format!("test-{}", random_string(10));
857
858 let conf = NatsSourceConfig {
859 connection_name: "".to_owned(),
860 subject: subject.clone(),
861 url: "nats://nats:4222,nats://demo.nats.io:4222".to_string(),
862 queue: None,
863 framing: default_framing_message_based(),
864 decoding: default_decoding(),
865 tls: None,
866 auth: None,
867 log_namespace: None,
868 subject_key_field: default_subject_key_field(),
869 ..Default::default()
870 };
871
872 let r = publish_and_check(conf).await;
873 assert!(
874 r.is_ok(),
875 "publish_and_check failed for multiple URLs, expected Ok(()), got: {r:?}"
876 );
877 }
878
879 #[tokio::test]
880 async fn nats_multiple_urls_invalid() {
881 let subject = format!("test-{}", random_string(10));
882
883 let conf = NatsSourceConfig {
884 connection_name: "".to_owned(),
885 subject: subject.clone(),
886 url: "http://invalid-url,nats://:invalid@localhost:4222".to_string(),
887 queue: None,
888 framing: default_framing_message_based(),
889 decoding: default_decoding(),
890 tls: None,
891 auth: None,
892 log_namespace: None,
893 subject_key_field: default_subject_key_field(),
894 ..Default::default()
895 };
896
897 let r = publish_and_check(conf).await;
898 assert!(
899 matches!(r, Err(BuildError::Connect { .. })),
900 "publish_and_check failed for bad URLs, expected BuildError::Connect, got: {r:?}"
901 );
902 }
903}