1pub mod tcp;
2pub mod udp;
3#[cfg(unix)]
4mod unix;
5
6use vector_lib::{
7 codecs::decoding::DeserializerConfig,
8 config::{LegacyKey, LogNamespace, log_schema},
9 configurable::configurable_component,
10 lookup::{lookup_v2::OptionalValuePath, owned_value_path},
11};
12use vrl::value::{Kind, kind::Collection};
13
14use crate::{
15 codecs::DecodingConfig,
16 config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
17 sources::util::net::TcpSource,
18 tls::MaybeTlsSettings,
19};
20
21#[configurable_component(source("socket", "Collect logs over a socket."))]
23#[derive(Clone, Debug)]
24pub struct SocketConfig {
25 #[serde(flatten)]
26 pub mode: Mode,
27}
28
29#[configurable_component]
31#[derive(Clone, Debug)]
32#[serde(tag = "mode", rename_all = "snake_case")]
33#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
34#[allow(clippy::large_enum_variant)] pub enum Mode {
36 Tcp(tcp::TcpConfig),
38
39 Udp(udp::UdpConfig),
41
42 #[cfg(unix)]
44 UnixDatagram(unix::UnixConfig),
45
46 #[cfg(unix)]
48 #[serde(alias = "unix")]
49 UnixStream(unix::UnixConfig),
50}
51
52impl SocketConfig {
53 pub fn new_tcp(tcp_config: tcp::TcpConfig) -> Self {
54 tcp_config.into()
55 }
56
57 pub fn make_basic_tcp_config(addr: std::net::SocketAddr) -> Self {
58 tcp::TcpConfig::from_address(addr.into()).into()
59 }
60
61 fn decoding(&self) -> DeserializerConfig {
62 match &self.mode {
63 Mode::Tcp(config) => config.decoding().clone(),
64 Mode::Udp(config) => config.decoding().clone(),
65 #[cfg(unix)]
66 Mode::UnixDatagram(config) => config.decoding().clone(),
67 #[cfg(unix)]
68 Mode::UnixStream(config) => config.decoding().clone(),
69 }
70 }
71
72 fn log_namespace(&self, global_log_namespace: LogNamespace) -> LogNamespace {
73 match &self.mode {
74 Mode::Tcp(config) => global_log_namespace.merge(config.log_namespace),
75 Mode::Udp(config) => global_log_namespace.merge(config.log_namespace),
76 #[cfg(unix)]
77 Mode::UnixDatagram(config) => global_log_namespace.merge(config.log_namespace),
78 #[cfg(unix)]
79 Mode::UnixStream(config) => global_log_namespace.merge(config.log_namespace),
80 }
81 }
82}
83
84impl From<tcp::TcpConfig> for SocketConfig {
85 fn from(config: tcp::TcpConfig) -> Self {
86 SocketConfig {
87 mode: Mode::Tcp(config),
88 }
89 }
90}
91
92impl From<udp::UdpConfig> for SocketConfig {
93 fn from(config: udp::UdpConfig) -> Self {
94 SocketConfig {
95 mode: Mode::Udp(config),
96 }
97 }
98}
99
100impl GenerateConfig for SocketConfig {
101 fn generate_config() -> toml::Value {
102 toml::from_str(
103 r#"mode = "tcp"
104 address = "0.0.0.0:9000""#,
105 )
106 .unwrap()
107 }
108}
109
110#[async_trait::async_trait]
111#[typetag::serde(name = "socket")]
112impl SourceConfig for SocketConfig {
113 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
114 match self.mode.clone() {
115 Mode::Tcp(config) => {
116 let log_namespace = cx.log_namespace(config.log_namespace);
117
118 let decoding = config.decoding().clone();
119 let decoder = DecodingConfig::new(
120 config
121 .framing
122 .clone()
123 .unwrap_or_else(|| decoding.default_stream_framing()),
124 decoding,
125 log_namespace,
126 )
127 .build()?;
128
129 let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
130 let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
131 let tls_client_metadata_key = config
132 .tls()
133 .as_ref()
134 .and_then(|tls| tls.client_metadata_key.clone())
135 .and_then(|k| k.path);
136 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
137 tcp.run(
138 config.address(),
139 config.keepalive(),
140 config.shutdown_timeout_secs(),
141 tls,
142 tls_client_metadata_key,
143 config.receive_buffer_bytes(),
144 config.max_connection_duration_secs(),
145 cx,
146 false.into(),
147 config.connection_limit,
148 config.permit_origin.map(Into::into),
149 SocketConfig::NAME,
150 log_namespace,
151 )
152 }
153 Mode::Udp(config) => {
154 let log_namespace = cx.log_namespace(config.log_namespace);
155 let decoding = config.decoding().clone();
156 let framing = config
157 .framing()
158 .clone()
159 .unwrap_or_else(|| decoding.default_message_based_framing());
160 let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
161 Ok(udp::udp(
162 config,
163 decoder,
164 cx.shutdown,
165 cx.out,
166 log_namespace,
167 ))
168 }
169 #[cfg(unix)]
170 Mode::UnixDatagram(config) => {
171 let log_namespace = cx.log_namespace(config.log_namespace);
172 let decoding = config.decoding.clone();
173 let framing = config
174 .framing
175 .clone()
176 .unwrap_or_else(|| decoding.default_message_based_framing());
177 let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
178
179 unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace)
180 }
181 #[cfg(unix)]
182 Mode::UnixStream(config) => {
183 let log_namespace = cx.log_namespace(config.log_namespace);
184
185 let decoding = config.decoding().clone();
186 let decoder = DecodingConfig::new(
187 config
188 .framing
189 .clone()
190 .unwrap_or_else(|| decoding.default_stream_framing()),
191 decoding,
192 log_namespace,
193 )
194 .build()?;
195
196 unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
197 }
198 }
199 }
200
201 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
202 let log_namespace = self.log_namespace(global_log_namespace);
203
204 let schema_definition = self
205 .decoding()
206 .schema_definition(log_namespace)
207 .with_standard_vector_source_metadata();
208
209 let schema_definition = match &self.mode {
210 Mode::Tcp(config) => {
211 let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
212
213 let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
214
215 let tls_client_metadata_path = config
216 .tls()
217 .as_ref()
218 .and_then(|tls| tls.client_metadata_key.as_ref())
219 .and_then(|k| k.path.clone())
220 .map(LegacyKey::Overwrite);
221
222 schema_definition
223 .with_source_metadata(
224 Self::NAME,
225 legacy_host_key,
226 &owned_value_path!("host"),
227 Kind::bytes(),
228 Some("host"),
229 )
230 .with_source_metadata(
231 Self::NAME,
232 legacy_port_key,
233 &owned_value_path!("port"),
234 Kind::integer(),
235 None,
236 )
237 .with_source_metadata(
238 Self::NAME,
239 tls_client_metadata_path,
240 &owned_value_path!("tls_client_metadata"),
241 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
242 .or_undefined(),
243 None,
244 )
245 }
246 Mode::Udp(config) => {
247 let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
248
249 let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
250
251 schema_definition
252 .with_source_metadata(
253 Self::NAME,
254 legacy_host_key,
255 &owned_value_path!("host"),
256 Kind::bytes(),
257 None,
258 )
259 .with_source_metadata(
260 Self::NAME,
261 legacy_port_key,
262 &owned_value_path!("port"),
263 Kind::integer(),
264 None,
265 )
266 }
267 #[cfg(unix)]
268 Mode::UnixDatagram(config) => {
269 let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
270
271 schema_definition.with_source_metadata(
272 Self::NAME,
273 legacy_host_key,
274 &owned_value_path!("host"),
275 Kind::bytes(),
276 None,
277 )
278 }
279 #[cfg(unix)]
280 Mode::UnixStream(config) => {
281 let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
282
283 schema_definition.with_source_metadata(
284 Self::NAME,
285 legacy_host_key,
286 &owned_value_path!("host"),
287 Kind::bytes(),
288 None,
289 )
290 }
291 };
292
293 vec![SourceOutput::new_maybe_logs(
294 self.decoding().output_type(),
295 schema_definition,
296 )]
297 }
298
299 fn resources(&self) -> Vec<Resource> {
300 match self.mode.clone() {
301 Mode::Tcp(tcp) => vec![tcp.address().as_tcp_resource()],
302 Mode::Udp(udp) => vec![udp.address().as_udp_resource()],
303 #[cfg(unix)]
304 Mode::UnixDatagram(_) => vec![],
305 #[cfg(unix)]
306 Mode::UnixStream(_) => vec![],
307 }
308 }
309
310 fn can_acknowledge(&self) -> bool {
311 false
312 }
313}
314
315pub(crate) fn default_host_key() -> OptionalValuePath {
316 log_schema().host_key().cloned().into()
317}
318
319#[cfg(test)]
320mod test {
321 use std::{
322 collections::HashMap,
323 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
324 sync::{
325 Arc,
326 atomic::{AtomicBool, Ordering},
327 },
328 thread,
329 };
330
331 use approx::assert_relative_eq;
332 use bytes::{BufMut, Bytes, BytesMut};
333 use futures::{StreamExt, stream};
334 use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
335 use serde_json::json;
336 use tokio::{
337 io::AsyncReadExt,
338 net::TcpStream,
339 task::JoinHandle,
340 time::{Duration, Instant, timeout},
341 };
342 #[cfg(unix)]
343 use vector_lib::codecs::{
344 CharacterDelimitedDecoderConfig, decoding::CharacterDelimitedDecoderOptions,
345 };
346 use vector_lib::{
347 codecs::{GelfDeserializerConfig, NewlineDelimitedDecoderConfig},
348 event::EventContainer,
349 lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
350 };
351 use vrl::{btreemap, value, value::ObjectMap};
352 #[cfg(unix)]
353 use {
354 super::{Mode, unix::UnixConfig},
355 crate::sources::util::unix::UNNAMED_SOCKET_HOST,
356 crate::test_util::wait_for,
357 futures::{SinkExt, Stream},
358 std::future::ready,
359 std::os::unix::fs::PermissionsExt,
360 std::path::PathBuf,
361 tokio::{
362 io::AsyncWriteExt,
363 net::{UnixDatagram, UnixStream},
364 task::yield_now,
365 },
366 tokio_util::codec::{FramedWrite, LinesCodec},
367 };
368
369 use super::{SocketConfig, tcp::TcpConfig, udp::UdpConfig};
370 use crate::{
371 SourceSender,
372 config::{ComponentKey, GlobalOptions, SourceConfig, SourceContext, log_schema},
373 event::{Event, LogEvent},
374 shutdown::{ShutdownSignal, SourceShutdownCoordinator},
375 sinks::util::tcp::TcpSinkConfig,
376 sources::util::net::SocketListenAddr,
377 test_util::{
378 addr::{PortGuard, next_addr, next_addr_any},
379 collect_n, collect_n_limited,
380 components::{
381 COMPONENT_ERROR_TAGS, SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance,
382 assert_source_error,
383 },
384 random_string, send_lines, send_lines_tls, wait_for_tcp,
385 },
386 tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
387 };
388
389 async fn wait_for_tcp_and_release(guard: PortGuard, addr: SocketAddr) {
390 wait_for_tcp(addr).await;
391 drop(guard) }
393
394 pub fn bind_unused_udp() -> UdpSocket {
395 UdpSocket::bind((IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
397 .expect("Failed to bind UDP socket to OS-assigned port")
398 }
399
400 pub fn bind_unused_udp_any() -> UdpSocket {
401 UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0))
403 .expect("Failed to bind UDP socket to OS-assigned port")
404 }
405
406 fn get_gelf_payload(message: &str) -> String {
407 serde_json::to_string(&json!({
408 "version": "1.1",
409 "host": "example.org",
410 "short_message": message,
411 "timestamp": 1234567890.123,
412 "level": 6,
413 "_foo": "bar",
414 }))
415 .unwrap()
416 }
417
418 fn create_gelf_chunk(
419 message_id: u64,
420 sequence_number: u8,
421 total_chunks: u8,
422 payload: &[u8],
423 ) -> Bytes {
424 const GELF_MAGIC: [u8; 2] = [0x1e, 0x0f];
425 let mut chunk = BytesMut::new();
426 chunk.put_slice(&GELF_MAGIC);
427 chunk.put_u64(message_id);
428 chunk.put_u8(sequence_number);
429 chunk.put_u8(total_chunks);
430 chunk.put(payload);
431 chunk.freeze()
432 }
433
434 fn get_gelf_chunks(short_message: &str, max_size: usize, rng: &mut SmallRng) -> Vec<Bytes> {
435 let message_id = rand::random();
436 let payload = get_gelf_payload(short_message);
437 let payload_chunks = payload.as_bytes().chunks(max_size).collect::<Vec<_>>();
438 let total_chunks = payload_chunks.len();
439 assert!(total_chunks <= 128, "too many gelf chunks");
440
441 let mut chunks = payload_chunks
442 .into_iter()
443 .enumerate()
444 .map(|(i, payload_chunk)| {
445 create_gelf_chunk(message_id, i as u8, total_chunks as u8, payload_chunk)
446 })
447 .collect::<Vec<_>>();
448 chunks.shuffle(rng);
450 chunks
451 }
452
453 #[test]
454 fn generate_config() {
455 crate::test_util::test_generate_config::<SocketConfig>();
456 }
457
458 #[tokio::test]
460 async fn tcp_it_includes_host() {
461 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
462 let (tx, mut rx) = SourceSender::new_test();
463 let (guard, addr) = next_addr();
464
465 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
466 .build(SourceContext::new_test(tx, None))
467 .await
468 .unwrap();
469 tokio::spawn(server);
470
471 wait_for_tcp_and_release(guard, addr).await;
472
473 let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
474 .await
475 .unwrap();
476
477 let event = rx.next().await.unwrap();
478
479 assert_eq!(event.as_log()["host"], addr.ip().to_string().into());
480 assert_eq!(event.as_log()["port"], addr.port().into());
481 })
482 .await;
483 }
484
485 #[tokio::test]
486 async fn tcp_it_includes_vector_namespaced_fields() {
487 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
488 let (tx, mut rx) = SourceSender::new_test();
489 let (guard, addr) = next_addr();
490 let mut conf = TcpConfig::from_address(addr.into());
491 conf.set_log_namespace(Some(true));
492
493 let server = SocketConfig::from(conf)
494 .build(SourceContext::new_test(tx, None))
495 .await
496 .unwrap();
497 tokio::spawn(server);
498
499 wait_for_tcp_and_release(guard, addr).await;
500
501 let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
502 .await
503 .unwrap();
504
505 let event = rx.next().await.unwrap();
506 let log = event.as_log();
507 let event_meta = log.metadata().value();
508
509 assert_eq!(log.value(), &"test".into());
510 assert_eq!(
511 event_meta.get(path!("vector", "source_type")).unwrap(),
512 &value!(SocketConfig::NAME)
513 );
514 assert_eq!(
515 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
516 &value!(addr.ip().to_string())
517 );
518 assert_eq!(
519 event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
520 &value!(addr.port())
521 );
522 })
523 .await;
524 }
525
526 #[tokio::test]
527 async fn tcp_splits_on_newline() {
528 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
529 let (tx, rx) = SourceSender::new_test();
530 let (guard, addr) = next_addr();
531
532 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
533 .build(SourceContext::new_test(tx, None))
534 .await
535 .unwrap();
536 tokio::spawn(server);
537
538 wait_for_tcp_and_release(guard, addr).await;
539
540 send_lines(addr, vec!["foo\nbar".to_owned()].into_iter())
541 .await
542 .unwrap();
543
544 let events = collect_n(rx, 2).await;
545
546 assert_eq!(events.len(), 2);
547 assert_eq!(
548 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
549 "foo".into()
550 );
551 assert_eq!(
552 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
553 "bar".into()
554 );
555 })
556 .await;
557 }
558
559 #[tokio::test]
560 async fn tcp_it_includes_source_type() {
561 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
562 let (tx, mut rx) = SourceSender::new_test();
563 let (guard, addr) = next_addr();
564
565 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
566 .build(SourceContext::new_test(tx, None))
567 .await
568 .unwrap();
569 tokio::spawn(server);
570
571 wait_for_tcp_and_release(guard, addr).await;
572 send_lines(addr, vec!["test".to_owned()].into_iter())
573 .await
574 .unwrap();
575
576 let event = rx.next().await.unwrap();
577 assert_eq!(
578 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
579 "socket".into()
580 );
581 })
582 .await;
583 }
584
585 #[tokio::test]
586 async fn tcp_continue_after_long_line() {
587 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
588 let (tx, mut rx) = SourceSender::new_test();
589 let (guard, addr) = next_addr();
590
591 let mut config = TcpConfig::from_address(addr.into());
592 config.set_framing(Some(
593 NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
594 ));
595
596 let server = SocketConfig::from(config)
597 .build(SourceContext::new_test(tx, None))
598 .await
599 .unwrap();
600 tokio::spawn(server);
601
602 let lines = vec![
603 "short".to_owned(),
604 "this is too long".to_owned(),
605 "more short".to_owned(),
606 ];
607
608 wait_for_tcp_and_release(guard, addr).await;
609 send_lines(addr, lines.into_iter()).await.unwrap();
610
611 let event = rx.next().await.unwrap();
612 assert_eq!(
613 event.as_log()[log_schema().message_key().unwrap().to_string()],
614 "short".into()
615 );
616
617 let event = rx.next().await.unwrap();
618 assert_eq!(
619 event.as_log()[log_schema().message_key().unwrap().to_string()],
620 "more short".into()
621 );
622 })
623 .await;
624 }
625
626 #[tokio::test]
627 async fn tcp_with_tls() {
628 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
629 let (tx, mut rx) = SourceSender::new_test();
630 let (guard, addr) = next_addr();
631
632 let mut config = TcpConfig::from_address(addr.into());
633 config.set_tls(Some(TlsSourceConfig {
634 tls_config: TlsEnableableConfig {
635 enabled: Some(true),
636 options: TlsConfig {
637 verify_certificate: Some(true),
638 crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
639 key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
640 ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
641 ..Default::default()
642 },
643 },
644 client_metadata_key: Some(OptionalValuePath::from(owned_value_path!("tls_peer"))),
645 }));
646
647 let server = SocketConfig::from(config)
648 .build(SourceContext::new_test(tx, None))
649 .await
650 .unwrap();
651 tokio::spawn(server);
652
653 let lines = vec!["one line".to_owned(), "another line".to_owned()];
654
655 wait_for_tcp_and_release(guard, addr).await;
656 send_lines_tls(
657 addr,
658 "localhost".into(),
659 lines.into_iter(),
660 std::path::Path::new(tls::TEST_PEM_CA_PATH),
661 std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
662 std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
663 )
664 .await
665 .unwrap();
666
667 let event = rx.next().await.unwrap();
668 assert_eq!(
669 event.as_log()[log_schema().message_key().unwrap().to_string()],
670 "one line".into()
671 );
672
673 let tls_meta: ObjectMap = btreemap!(
674 "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
675 );
676
677 assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
678
679 let event = rx.next().await.unwrap();
680 assert_eq!(
681 event.as_log()[log_schema().message_key().unwrap().to_string()],
682 "another line".into()
683 );
684
685 assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
686 })
687 .await;
688 }
689
690 #[tokio::test]
691 async fn tcp_with_tls_vector_namespace() {
692 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
693 let (tx, mut rx) = SourceSender::new_test();
694 let (guard, addr) = next_addr();
695
696 let mut config = TcpConfig::from_address(addr.into());
697 config.set_tls(Some(TlsSourceConfig {
698 tls_config: TlsEnableableConfig {
699 enabled: Some(true),
700 options: TlsConfig {
701 verify_certificate: Some(true),
702 crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
703 key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
704 ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
705 ..Default::default()
706 },
707 },
708 client_metadata_key: None,
709 }));
710 config.log_namespace = Some(true);
711
712 let server = SocketConfig::from(config)
713 .build(SourceContext::new_test(tx, None))
714 .await
715 .unwrap();
716 tokio::spawn(server);
717
718 let lines = vec!["one line".to_owned(), "another line".to_owned()];
719
720 wait_for_tcp_and_release(guard, addr).await;
721 send_lines_tls(
722 addr,
723 "localhost".into(),
724 lines.into_iter(),
725 std::path::Path::new(tls::TEST_PEM_CA_PATH),
726 std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
727 std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
728 )
729 .await
730 .unwrap();
731
732 let event = rx.next().await.unwrap();
733 let log = event.as_log();
734 let event_meta = log.metadata().value();
735
736 assert_eq!(log.value(), &"one line".into());
737
738 let tls_meta: ObjectMap = btreemap!(
739 "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
740 );
741
742 assert_eq!(
743 event_meta
744 .get(path!(SocketConfig::NAME, "tls_client_metadata"))
745 .unwrap(),
746 &value!(tls_meta.clone())
747 );
748
749 let event = rx.next().await.unwrap();
750 let log = event.as_log();
751 let event_meta = log.metadata().value();
752
753 assert_eq!(log.value(), &"another line".into());
754
755 assert_eq!(
756 event_meta
757 .get(path!(SocketConfig::NAME, "tls_client_metadata"))
758 .unwrap(),
759 &value!(tls_meta.clone())
760 );
761 })
762 .await;
763 }
764
765 #[tokio::test]
766 async fn tcp_shutdown_simple() {
767 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
768 let source_id = ComponentKey::from("tcp_shutdown_simple");
769 let (tx, mut rx) = SourceSender::new_test();
770 let (guard, addr) = next_addr();
771 let (cx, mut shutdown) = SourceContext::new_shutdown(&source_id, tx);
772
773 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
775 .build(cx)
776 .await
777 .unwrap();
778 let source_handle = tokio::spawn(server);
779
780 wait_for_tcp_and_release(guard, addr).await;
782 send_lines(addr, vec!["test".to_owned()].into_iter())
783 .await
784 .unwrap();
785
786 let event = rx.next().await.unwrap();
787 assert_eq!(
788 event.as_log()[log_schema().message_key().unwrap().to_string()],
789 "test".into()
790 );
791
792 let deadline = Instant::now() + Duration::from_secs(10);
794 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
795 let shutdown_success = shutdown_complete.await;
796 assert!(shutdown_success);
797
798 _ = source_handle.await.unwrap();
800 })
801 .await;
802 }
803
804 #[tokio::test]
807 async fn tcp_shutdown_infinite_stream() {
808 let (guard, addr) = next_addr();
813
814 let (source_tx, source_rx) = SourceSender::new_test_sender_with_options(10_000, None);
815 let source_key = ComponentKey::from("tcp_shutdown_infinite_stream");
816 let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx);
817
818 let mut source_config = TcpConfig::from_address(addr.into());
819 source_config.set_shutdown_timeout_secs(1);
820 let source_task = SocketConfig::from(source_config)
821 .build(source_cx)
822 .await
823 .unwrap();
824
825 let source_handle = tokio::spawn(source_task);
827 wait_for_tcp_and_release(guard, addr).await;
828
829 let message = random_string(512);
833 let message_bytes = Bytes::from(message.clone());
834
835 #[derive(Clone, Debug)]
836 struct Serializer {
837 bytes: Bytes,
838 }
839 impl tokio_util::codec::Encoder<Event> for Serializer {
840 type Error = vector_lib::codecs::encoding::Error;
841
842 fn encode(&mut self, _: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
843 buffer.put(self.bytes.as_ref());
844 buffer.put_u8(b'\n');
845 Ok(())
846 }
847 }
848 let sink_config = TcpSinkConfig::from_address(format!("localhost:{}", addr.port()));
849 let encoder = Serializer {
850 bytes: message_bytes,
851 };
852 let (sink, _healthcheck) = sink_config.build(Default::default(), encoder).unwrap();
853
854 tokio::spawn(async move {
855 let input = stream::repeat_with(|| LogEvent::default().into()).boxed();
856 sink.run(input).await.unwrap();
857 });
858
859 let events = collect_n_limited(source_rx, 100)
863 .await
864 .into_iter()
865 .collect::<Vec<_>>();
866 assert_eq!(100, events.len());
867
868 let message_key = log_schema().message_key().unwrap().to_string();
869 let expected_message = message.clone().into();
870 for event in events.into_iter().flat_map(EventContainer::into_events) {
871 assert_eq!(event.as_log()[message_key.as_str()], expected_message);
872 }
873
874 let shutdown_timeout_limit = Duration::from_secs(10);
877 let deadline = Instant::now() + shutdown_timeout_limit;
878 let shutdown_complete = shutdown.shutdown_source(&source_key, deadline);
879
880 let shutdown_result = timeout(shutdown_timeout_limit, shutdown_complete).await;
881 assert_eq!(shutdown_result, Ok(true));
882
883 let source_result = source_handle.await.expect("source task should not panic");
884 assert_eq!(source_result, Ok(()));
885 }
886
887 #[tokio::test]
888 async fn tcp_connection_close_after_max_duration() {
889 let (tx, _) = SourceSender::new_test();
890 let (guard, addr) = next_addr();
891
892 let mut source_config = TcpConfig::from_address(addr.into());
893 source_config.set_max_connection_duration_secs(Some(1));
894 let source_task = SocketConfig::from(source_config)
895 .build(SourceContext::new_test(tx, None))
896 .await
897 .unwrap();
898
899 drop(tokio::spawn(source_task));
901 wait_for_tcp_and_release(guard, addr).await;
902
903 let mut stream: TcpStream = TcpStream::connect(addr)
904 .await
905 .expect("stream should be able to connect");
906 let start = Instant::now();
907
908 let timeout = tokio::time::sleep(Duration::from_millis(1200));
909 let mut buffer = [0u8; 10];
910
911 tokio::select! {
912 _ = timeout => {
913 panic!("timed out waiting for stream to close")
914 },
915 read_result = stream.read(&mut buffer) => {
916 match read_result {
917 Ok(0) => assert_relative_eq!(start.elapsed().as_secs_f64(), 1.0, epsilon = 0.3),
919 Ok(_) => panic!("unexpectedly read data from stream"),
920 Err(e) => panic!("{e:}")
921 }
922 }
923 }
924 }
925
926 async fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> UdpSocket {
928 send_lines_udp_from(bind_unused_udp(), to, lines)
929 }
930
931 fn send_lines_udp_from(
932 from: UdpSocket,
933 to: SocketAddr,
934 lines: impl IntoIterator<Item = String>,
935 ) -> UdpSocket {
936 send_packets_udp_from(from, to, lines.into_iter().map(|line| line.into()))
937 }
938
939 async fn send_packets_udp(
940 to: SocketAddr,
941 packets: impl IntoIterator<Item = Bytes>,
942 ) -> UdpSocket {
943 send_packets_udp_from(bind_unused_udp(), to, packets)
944 }
945
946 fn send_packets_udp_from(
947 from: UdpSocket,
948 to: SocketAddr,
949 packets: impl IntoIterator<Item = Bytes>,
950 ) -> UdpSocket {
951 for packet in packets {
952 assert_eq!(
953 from.send_to(&packet, to)
954 .map_err(|error| panic!("{error:}"))
955 .ok()
956 .unwrap(),
957 packet.len()
958 );
959 thread::sleep(Duration::from_millis(1));
961 }
962
963 thread::sleep(Duration::from_millis(10));
965
966 from
968 }
969
970 async fn init_udp_with_shutdown(
971 sender: SourceSender,
972 source_id: &ComponentKey,
973 shutdown: &mut SourceShutdownCoordinator,
974 ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
975 let (shutdown_signal, _) = shutdown.register_source(source_id, false);
976 init_udp_inner(sender, source_id, shutdown_signal, None, false).await
977 }
978
979 async fn init_udp(sender: SourceSender, use_log_namespace: bool) -> SocketAddr {
980 init_udp_inner(
981 sender,
982 &ComponentKey::from("default"),
983 ShutdownSignal::noop(),
984 None,
985 use_log_namespace,
986 )
987 .await
988 .0
989 }
990
991 async fn init_udp_with_config(sender: SourceSender, config: UdpConfig) -> SocketAddr {
992 init_udp_inner(
993 sender,
994 &ComponentKey::from("default"),
995 ShutdownSignal::noop(),
996 Some(config),
997 false,
998 )
999 .await
1000 .0
1001 }
1002
1003 async fn init_udp_inner(
1004 sender: SourceSender,
1005 source_key: &ComponentKey,
1006 shutdown_signal: ShutdownSignal,
1007 config: Option<UdpConfig>,
1008 use_vector_namespace: bool,
1009 ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
1010 let (guard, address, mut config) = match config {
1011 Some(config) => match config.address() {
1012 SocketListenAddr::SocketAddr(addr) => (None, addr, config),
1013 _ => panic!("listen address should not be systemd FD offset in tests"),
1014 },
1015 None => {
1016 let (guard, address) = next_addr();
1017 (
1018 Some(guard),
1019 address,
1020 UdpConfig::from_address(address.into()),
1021 )
1022 }
1023 };
1024
1025 let config = if use_vector_namespace {
1026 config.set_log_namespace(Some(true));
1027 config
1028 } else {
1029 config
1030 };
1031
1032 let server = SocketConfig::from(config)
1033 .build(SourceContext {
1034 key: source_key.clone(),
1035 globals: GlobalOptions::default(),
1036 enrichment_tables: Default::default(),
1037 shutdown: shutdown_signal,
1038 out: sender,
1039 proxy: Default::default(),
1040 acknowledgements: false,
1041 schema: Default::default(),
1042 schema_definitions: HashMap::default(),
1043 extra_context: Default::default(),
1044 })
1045 .await
1046 .unwrap();
1047 let source_handle = tokio::spawn(server);
1048
1049 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1051
1052 if let Some(guard) = guard {
1053 drop(guard)
1054 }
1055
1056 (address, source_handle)
1057 }
1058
1059 #[tokio::test]
1060 async fn udp_message() {
1061 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1062 let (tx, rx) = SourceSender::new_test();
1063 let address = init_udp(tx, false).await;
1064
1065 send_lines_udp(address, vec!["test".to_string()]).await;
1066 let events = collect_n(rx, 1).await;
1067
1068 assert_eq!(
1069 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1070 "test".into()
1071 );
1072 })
1073 .await;
1074 }
1075
1076 #[tokio::test]
1077 async fn udp_message_preserves_newline() {
1078 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1079 let (tx, rx) = SourceSender::new_test();
1080 let address = init_udp(tx, false).await;
1081
1082 send_lines_udp(address, vec!["foo\nbar".to_string()]).await;
1083 let events = collect_n(rx, 1).await;
1084
1085 assert_eq!(
1086 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1087 "foo\nbar".into()
1088 );
1089 })
1090 .await;
1091 }
1092
1093 #[tokio::test]
1094 async fn udp_multiple_packets() {
1095 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1096 let (tx, rx) = SourceSender::new_test();
1097 let address = init_udp(tx, false).await;
1098
1099 send_lines_udp(address, vec!["test".to_string(), "test2".to_string()]).await;
1100 let events = collect_n(rx, 2).await;
1101
1102 assert_eq!(
1103 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1104 "test".into()
1105 );
1106 assert_eq!(
1107 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1108 "test2".into()
1109 );
1110 })
1111 .await;
1112 }
1113
1114 #[tokio::test]
1115 async fn udp_max_length() {
1116 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1117 let (tx, rx) = SourceSender::new_test();
1118 let (_, address) = next_addr();
1119 let mut config = UdpConfig::from_address(address.into());
1120 config.max_length = 11;
1121 let address = init_udp_with_config(tx, config).await;
1122
1123 send_lines_udp(
1124 address,
1125 vec![
1126 "short line".to_string(),
1127 "test with a long line".to_string(),
1128 "a short un".to_string(),
1129 ],
1130 )
1131 .await;
1132
1133 let events = collect_n(rx, 2).await;
1134 assert_eq!(
1135 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1136 "short line".into()
1137 );
1138 assert_eq!(
1139 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1140 "a short un".into()
1141 );
1142 })
1143 .await;
1144 }
1145
1146 #[cfg(unix)]
1147 #[tokio::test]
1148 async fn udp_max_length_delimited() {
1153 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1154 let (tx, rx) = SourceSender::new_test();
1155 let (_, address) = next_addr();
1156 let mut config = UdpConfig::from_address(address.into());
1157 config.max_length = 10;
1158 config.framing = Some(
1159 CharacterDelimitedDecoderConfig {
1160 character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
1161 }
1162 .into(),
1163 );
1164 let address = init_udp_with_config(tx, config).await;
1165
1166 send_lines_udp(
1167 address,
1168 vec!["test with, long line".to_string(), "short one".to_string()],
1169 )
1170 .await;
1171
1172 let events = collect_n(rx, 2).await;
1173 assert_eq!(
1174 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1175 "test with".into()
1176 );
1177 assert_eq!(
1178 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1179 "short one".into()
1180 );
1181 })
1182 .await;
1183 }
1184
1185 #[tokio::test]
1186 async fn udp_decodes_chunked_gelf_messages() {
1187 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1188 let (tx, rx) = SourceSender::new_test();
1189 let (_, address) = next_addr();
1190 let mut config = UdpConfig::from_address(address.into());
1191 config.decoding = GelfDeserializerConfig::default().into();
1192 let address = init_udp_with_config(tx, config).await;
1193 let seed = 42;
1194 let mut rng = SmallRng::seed_from_u64(seed);
1195 let max_size = 300;
1196 let big_message = "This is a very large message".repeat(500);
1197 let another_big_message = "This is another very large message".repeat(500);
1198 let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1199 let mut another_chunks =
1200 get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1201 chunks.append(&mut another_chunks);
1202 chunks.shuffle(&mut rng);
1203
1204 send_packets_udp(address, chunks).await;
1205
1206 let events = collect_n(rx, 2).await;
1207 assert_eq!(
1208 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1209 big_message.into()
1210 );
1211 assert_eq!(
1212 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1213 another_big_message.into()
1214 );
1215 })
1216 .await;
1217 }
1218
1219 #[tokio::test]
1220 async fn udp_it_includes_host() {
1221 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1222 let (tx, rx) = SourceSender::new_test();
1223 let address = init_udp(tx, false).await;
1224
1225 let from = send_lines_udp(address, vec!["test".to_string()]).await;
1226 let events = collect_n(rx, 1).await;
1227
1228 assert_eq!(
1229 events[0].as_log()["host"],
1230 from.local_addr().unwrap().ip().to_string().into()
1231 );
1232 assert_eq!(
1233 events[0].as_log()["port"],
1234 from.local_addr().unwrap().port().into()
1235 );
1236 })
1237 .await;
1238 }
1239
1240 #[tokio::test]
1241 async fn udp_it_includes_vector_namespaced_fields() {
1242 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1243 let (tx, rx) = SourceSender::new_test();
1244 let address = init_udp(tx, true).await;
1245
1246 let from = send_lines_udp(address, vec!["test".to_string()]).await;
1247 let events = collect_n(rx, 1).await;
1248 let log = events[0].as_log();
1249 let event_meta = log.metadata().value();
1250
1251 assert_eq!(log.value(), &"test".into());
1252 assert_eq!(
1253 event_meta.get(path!("vector", "source_type")).unwrap(),
1254 &value!(SocketConfig::NAME)
1255 );
1256 assert_eq!(
1257 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1258 &value!(from.local_addr().unwrap().ip().to_string())
1259 );
1260 assert_eq!(
1261 event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
1262 &value!(from.local_addr().unwrap().port())
1263 );
1264 })
1265 .await;
1266 }
1267
1268 #[tokio::test]
1269 async fn udp_it_includes_source_type() {
1270 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1271 let (tx, rx) = SourceSender::new_test();
1272 let address = init_udp(tx, false).await;
1273
1274 _ = send_lines_udp(address, vec!["test".to_string()]).await;
1275 let events = collect_n(rx, 1).await;
1276
1277 assert_eq!(
1278 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1279 "socket".into()
1280 );
1281 })
1282 .await;
1283 }
1284
1285 #[tokio::test]
1286 async fn udp_shutdown_simple() {
1287 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1288 let (tx, rx) = SourceSender::new_test();
1289 let source_id = ComponentKey::from("udp_shutdown_simple");
1290
1291 let mut shutdown = SourceShutdownCoordinator::default();
1292 let (address, source_handle) =
1293 init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1294
1295 send_lines_udp(address, vec!["test".to_string()]).await;
1296 let events = collect_n(rx, 1).await;
1297
1298 assert_eq!(
1299 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1300 "test".into()
1301 );
1302
1303 let deadline = Instant::now() + Duration::from_secs(10);
1305 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1306 let shutdown_success = shutdown_complete.await;
1307 assert!(shutdown_success);
1308
1309 _ = source_handle.await.unwrap();
1311 })
1312 .await;
1313 }
1314
1315 #[tokio::test]
1316 async fn udp_shutdown_infinite_stream() {
1317 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1318 let (tx, rx) = SourceSender::new_test();
1319 let source_id = ComponentKey::from("udp_shutdown_infinite_stream");
1320
1321 let mut shutdown = SourceShutdownCoordinator::default();
1322 let (address, source_handle) =
1323 init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1324
1325 let run_pump_atomic_sender = Arc::new(AtomicBool::new(true));
1327 let run_pump_atomic_receiver = Arc::clone(&run_pump_atomic_sender);
1328 let pump_handle = tokio::task::spawn_blocking(move || {
1329 let handle = tokio::runtime::Handle::current();
1330 handle.block_on(send_lines_udp(
1331 address,
1332 std::iter::repeat("test".to_string())
1333 .take_while(move |_| run_pump_atomic_receiver.load(Ordering::Relaxed)),
1334 ));
1335 });
1336
1337 let events = collect_n(rx, 100).await;
1339 assert_eq!(100, events.len());
1340 for event in events {
1341 assert_eq!(
1342 event.as_log()[log_schema().message_key().unwrap().to_string()],
1343 "test".into()
1344 );
1345 }
1346
1347 let deadline = Instant::now() + Duration::from_secs(10);
1348 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1349 let shutdown_success = shutdown_complete.await;
1350 assert!(shutdown_success);
1351
1352 _ = source_handle.await.unwrap();
1354
1355 run_pump_atomic_sender.store(false, Ordering::Relaxed);
1357 assert!(pump_handle.await.is_ok());
1358 })
1359 .await;
1360 }
1361
1362 #[tokio::test]
1363 async fn multicast_udp_message() {
1364 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1365 let (tx, mut rx) = SourceSender::new_test();
1366 let (_guard, socket_address) = next_addr_any();
1368 let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1369 let multicast_socket_address =
1370 SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1371 let mut config = UdpConfig::from_address(socket_address.into());
1372 config.multicast_groups = vec![multicast_ip_address];
1373 init_udp_with_config(tx, config).await;
1374
1375 send_lines_udp_from(
1379 bind_unused_udp_any(),
1380 multicast_socket_address,
1381 ["test".to_string()],
1382 );
1383
1384 let event = rx.next().await.expect("must receive an event");
1385 assert_eq!(
1386 event.as_log()[log_schema().message_key().unwrap().to_string()],
1387 "test".into()
1388 );
1389 })
1390 .await;
1391 }
1392
1393 #[tokio::test]
1394 async fn multiple_multicast_addresses_udp_message() {
1395 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1396 let (tx, mut rx) = SourceSender::new_test();
1397 let (_guard, socket_address) = next_addr_any();
1398 let multicast_ip_addresses = (2..12)
1399 .map(|i| format!("224.0.0.{i}").parse().unwrap())
1400 .collect::<Vec<Ipv4Addr>>();
1401 let multicast_ip_socket_addresses = multicast_ip_addresses
1402 .iter()
1403 .map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
1404 .collect::<Vec<SocketAddr>>();
1405 let mut config = UdpConfig::from_address(socket_address.into());
1406 config.multicast_groups = multicast_ip_addresses;
1407 init_udp_with_config(tx, config).await;
1408
1409 let mut from = bind_unused_udp_any();
1410 for multicast_ip_socket_address in multicast_ip_socket_addresses {
1411 from = send_lines_udp_from(
1412 from,
1413 multicast_ip_socket_address,
1414 [multicast_ip_socket_address.to_string()],
1415 );
1416
1417 let event = rx.next().await.expect("must receive an event");
1418 assert_eq!(
1419 event.as_log()[log_schema().message_key().unwrap().to_string()],
1420 multicast_ip_socket_address.to_string().into()
1421 );
1422 }
1423 })
1424 .await;
1425 }
1426
1427 #[tokio::test]
1428 async fn multicast_and_unicast_udp_message() {
1429 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1430 let (tx, mut rx) = SourceSender::new_test();
1431 let (_guard, socket_address) = next_addr_any();
1432 let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1433 let multicast_socket_address =
1434 SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1435 let mut config = UdpConfig::from_address(socket_address.into());
1436 config.multicast_groups = vec![multicast_ip_address];
1437 init_udp_with_config(tx, config).await;
1438
1439 let _ = send_lines_udp_from(
1441 bind_unused_udp_any(),
1442 multicast_socket_address,
1443 ["test".to_string()],
1444 );
1445 let event = rx.next().await.expect("must receive an event");
1446 assert_eq!(
1447 event.as_log()[log_schema().message_key().unwrap().to_string()],
1448 "test".into()
1449 );
1450
1451 let to = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket_address.port());
1454 send_lines_udp_from(bind_unused_udp(), to, ["test".to_string()]);
1458 let event = rx.next().await.expect("must receive an event");
1459 assert_eq!(
1460 event.as_log()[log_schema().message_key().unwrap().to_string()],
1461 "test".into()
1462 );
1463 })
1464 .await;
1465 }
1466
1467 #[tokio::test]
1468 async fn udp_invalid_multicast_group() {
1469 assert_source_error(&COMPONENT_ERROR_TAGS, async {
1470 let (tx, _rx) = SourceSender::new_test();
1471 let (_, socket_address) = next_addr_any();
1472 let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
1473 let mut config = UdpConfig::from_address(socket_address.into());
1474 config.multicast_groups = vec![invalid_multicast_ip_address];
1475 init_udp_with_config(tx, config).await;
1476 })
1477 .await;
1478 }
1479
1480 #[cfg(unix)]
1483 async fn init_unix(sender: SourceSender, stream: bool, use_vector_namespace: bool) -> PathBuf {
1484 init_unix_inner(sender, stream, use_vector_namespace, None).await
1485 }
1486
1487 #[cfg(unix)]
1488 async fn init_unix_with_config(
1489 sender: SourceSender,
1490 stream: bool,
1491 use_vector_namespace: bool,
1492 config: UnixConfig,
1493 ) -> PathBuf {
1494 init_unix_inner(sender, stream, use_vector_namespace, Some(config)).await
1495 }
1496
1497 #[cfg(unix)]
1498 async fn init_unix_inner(
1499 sender: SourceSender,
1500 stream: bool,
1501 use_vector_namespace: bool,
1502 config: Option<UnixConfig>,
1503 ) -> PathBuf {
1504 let mut config = config.unwrap_or_else(|| {
1505 UnixConfig::new(tempfile::tempdir().unwrap().keep().join("unix_test"))
1506 });
1507
1508 let in_path = config.path.clone();
1509
1510 if use_vector_namespace {
1511 config.log_namespace = Some(true);
1512 }
1513
1514 let mode = if stream {
1515 Mode::UnixStream(config)
1516 } else {
1517 Mode::UnixDatagram(config)
1518 };
1519
1520 let server = SocketConfig { mode }
1521 .build(SourceContext::new_test(sender, None))
1522 .await
1523 .unwrap();
1524 tokio::spawn(server);
1525
1526 while if stream {
1528 std::os::unix::net::UnixStream::connect(&in_path).is_err()
1529 } else {
1530 let socket = std::os::unix::net::UnixDatagram::unbound().unwrap();
1531 socket.connect(&in_path).is_err()
1532 } {
1533 yield_now().await;
1534 }
1535
1536 in_path
1537 }
1538
1539 #[cfg(unix)]
1540 async fn unix_send_lines(stream: bool, path: PathBuf, lines: &[&str]) {
1541 match stream {
1542 false => send_lines_unix_datagram(path, lines).await,
1543 true => send_lines_unix_stream(path, lines).await,
1544 }
1545 }
1546
1547 #[cfg(unix)]
1548 async fn unix_message(
1549 message: &str,
1550 stream: bool,
1551 use_vector_namespace: bool,
1552 ) -> (PathBuf, impl Stream<Item = Event> + use<>) {
1553 let (tx, rx) = SourceSender::new_test();
1554 let path = init_unix(tx, stream, use_vector_namespace).await;
1555 let path_clone = path.clone();
1556
1557 unix_send_lines(stream, path, &[message]).await;
1558
1559 (path_clone, rx)
1560 }
1561
1562 #[cfg(unix)]
1563 async fn unix_multiple_packets(stream: bool) {
1564 let (tx, rx) = SourceSender::new_test();
1565 let path = init_unix(tx, stream, false).await;
1566
1567 unix_send_lines(stream, path, &["test", "test2"]).await;
1568 let events = collect_n(rx, 2).await;
1569
1570 assert_eq!(2, events.len());
1571 assert_eq!(
1572 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1573 "test".into()
1574 );
1575 assert_eq!(
1576 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1577 "test2".into()
1578 );
1579 }
1580
1581 #[cfg(unix)]
1582 fn parses_unix_config(mode: &str) -> SocketConfig {
1583 toml::from_str::<SocketConfig>(&format!(
1584 r#"
1585 mode = "{mode}"
1586 path = "/does/not/exist"
1587 "#
1588 ))
1589 .unwrap()
1590 }
1591
1592 #[cfg(unix)]
1593 fn parses_unix_config_file_mode(mode: &str) -> SocketConfig {
1594 toml::from_str::<SocketConfig>(&format!(
1595 r#"
1596 mode = "{mode}"
1597 path = "/does/not/exist"
1598 socket_file_mode = 0o777
1599 "#
1600 ))
1601 .unwrap()
1602 }
1603
1604 #[cfg(unix)]
1606 async fn send_lines_unix_datagram(path: PathBuf, lines: &[&str]) {
1607 let packets = lines.iter().map(|line| Bytes::from(line.to_string()));
1608 send_packets_unix_datagram(path, packets).await;
1609 }
1610
1611 #[cfg(unix)]
1612 async fn send_packets_unix_datagram(path: PathBuf, packets: impl IntoIterator<Item = Bytes>) {
1613 let socket = UnixDatagram::unbound().unwrap();
1614 socket.connect(path).unwrap();
1615
1616 for packet in packets {
1617 socket.send(&packet).await.unwrap();
1618 }
1619 socket.shutdown(std::net::Shutdown::Both).unwrap();
1620 }
1621
1622 #[cfg(unix)]
1623 #[tokio::test]
1624 async fn unix_datagram_message() {
1625 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1626 let (_, rx) = unix_message("test", false, false).await;
1627 let events = collect_n(rx, 1).await;
1628
1629 assert_eq!(events.len(), 1);
1630 assert_eq!(
1631 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1632 "test".into()
1633 );
1634 assert_eq!(
1635 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1636 "socket".into()
1637 );
1638 assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into());
1639 })
1640 .await;
1641 }
1642
1643 #[ignore]
1644 #[cfg(unix)]
1645 #[tokio::test]
1646 async fn unix_datagram_socket_test() {
1647 use tempfile::tempdir;
1651 use tokio::net::UnixDatagram;
1652
1653 let tmp = tempdir().unwrap();
1654
1655 let tx_path = tmp.path().join("tx");
1656
1657 let tx_type = "bound";
1660
1661 let tx = if tx_type == "bound" {
1662 UnixDatagram::bind(&tx_path).unwrap()
1663 } else {
1664 UnixDatagram::unbound().unwrap()
1665 };
1666
1667 let rx_path = tmp.path().join("rx");
1673 let rx = UnixDatagram::bind(&rx_path).unwrap();
1674
1675 tx.connect(&rx_path).unwrap();
1677
1678 let bytes = b"hello world";
1680 tx.send(bytes).await.unwrap();
1681
1682 let mut buf = vec![0u8; 24];
1683 let (size, _) = rx.recv_from(&mut buf).await.unwrap();
1684
1685 let dgram = &buf[..size];
1686 assert_eq!(dgram, bytes);
1687 }
1688
1689 #[cfg(unix)]
1690 #[tokio::test]
1691 async fn unix_datagram_chunked_gelf_messages() {
1692 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1693 let (tx, rx) = SourceSender::new_test();
1694 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1695 let mut config = UnixConfig::new(in_path.clone());
1696 config.decoding = GelfDeserializerConfig::default().into();
1697 let path = init_unix_with_config(tx, false, false, config).await;
1698 let seed = 42;
1699 let mut rng = SmallRng::seed_from_u64(seed);
1700 let max_size = 20;
1701 let big_message = "This is a very large message".repeat(5);
1702 let another_big_message = "This is another very large message".repeat(5);
1703 let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1704 let mut another_chunks =
1705 get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1706 chunks.append(&mut another_chunks);
1707 chunks.shuffle(&mut rng);
1708
1709 send_packets_unix_datagram(path, chunks).await;
1710
1711 let events = collect_n(rx, 2).await;
1712 assert_eq!(
1713 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1714 big_message.into()
1715 );
1716 assert_eq!(
1717 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1718 another_big_message.into()
1719 );
1720 })
1721 .await;
1722 }
1723
1724 #[cfg(unix)]
1725 #[tokio::test]
1726 async fn unix_datagram_message_with_vector_namespace() {
1727 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1728 let (_, rx) = unix_message("test", false, true).await;
1729 let events = collect_n(rx, 1).await;
1730 let log = events[0].as_log();
1731 let event_meta = log.metadata().value();
1732
1733 assert_eq!(log.value(), &"test".into());
1734 assert_eq!(events.len(), 1);
1735
1736 assert_eq!(
1737 event_meta.get(path!("vector", "source_type")).unwrap(),
1738 &value!(SocketConfig::NAME)
1739 );
1740
1741 assert_eq!(
1742 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1743 &value!(UNNAMED_SOCKET_HOST)
1744 );
1745 })
1746 .await;
1747 }
1748
1749 #[cfg(unix)]
1750 #[tokio::test]
1751 async fn unix_datagram_message_preserves_newline() {
1752 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1753 let (_, rx) = unix_message("foo\nbar", false, false).await;
1754 let events = collect_n(rx, 1).await;
1755
1756 assert_eq!(events.len(), 1);
1757 assert_eq!(
1758 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1759 "foo\nbar".into()
1760 );
1761 assert_eq!(
1762 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1763 "socket".into()
1764 );
1765 })
1766 .await;
1767 }
1768
1769 #[cfg(unix)]
1770 #[tokio::test]
1771 async fn unix_datagram_multiple_packets() {
1772 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1773 unix_multiple_packets(false).await
1774 })
1775 .await;
1776 }
1777
1778 #[cfg(unix)]
1779 #[test]
1780 fn parses_unix_datagram_config() {
1781 let config = parses_unix_config("unix_datagram");
1782 assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1783 }
1784
1785 #[cfg(unix)]
1786 #[test]
1787 fn parses_unix_datagram_perms() {
1788 let config = parses_unix_config_file_mode("unix_datagram");
1789 assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1790 }
1791
1792 #[cfg(unix)]
1793 #[tokio::test]
1794 async fn unix_datagram_permissions() {
1795 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1796 let (tx, _) = SourceSender::new_test();
1797
1798 let mut config = UnixConfig::new(in_path.clone());
1799 config.socket_file_mode = Some(0o555);
1800 let mode = Mode::UnixDatagram(config);
1801 let server = SocketConfig { mode }
1802 .build(SourceContext::new_test(tx, None))
1803 .await
1804 .unwrap();
1805 tokio::spawn(server);
1806
1807 wait_for(|| {
1808 match std::fs::metadata(&in_path) {
1809 Ok(meta) => {
1810 match meta.permissions().mode() {
1811 0o140555 => ready(true),
1813 _ => ready(false),
1814 }
1815 }
1816 Err(_) => ready(false),
1817 }
1818 })
1819 .await;
1820 }
1821
1822 #[cfg(unix)]
1824 async fn send_lines_unix_stream(path: PathBuf, lines: &[&str]) {
1825 let socket = UnixStream::connect(path).await.unwrap();
1826 let mut sink = FramedWrite::new(socket, LinesCodec::new());
1827
1828 let lines = lines.iter().map(|s| Ok(s.to_string()));
1829 let lines = lines.collect::<Vec<_>>();
1830 sink.send_all(&mut stream::iter(lines)).await.unwrap();
1831
1832 let mut socket = sink.into_inner();
1833 socket.shutdown().await.unwrap();
1834 }
1835
1836 #[cfg(unix)]
1837 #[tokio::test]
1838 async fn unix_stream_message() {
1839 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1840 let (_, rx) = unix_message("test", true, false).await;
1841 let events = collect_n(rx, 1).await;
1842
1843 assert_eq!(1, events.len());
1844 assert_eq!(
1845 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1846 "test".into()
1847 );
1848 assert_eq!(
1849 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1850 "socket".into()
1851 );
1852 })
1853 .await;
1854 }
1855
1856 #[cfg(unix)]
1857 #[tokio::test]
1858 async fn unix_stream_message_with_vector_namespace() {
1859 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1860 let (_, rx) = unix_message("test", true, true).await;
1861 let events = collect_n(rx, 1).await;
1862 let log = events[0].as_log();
1863 let event_meta = log.metadata().value();
1864
1865 assert_eq!(log.value(), &"test".into());
1866 assert_eq!(1, events.len());
1867 assert_eq!(
1868 event_meta.get(path!("vector", "source_type")).unwrap(),
1869 &value!(SocketConfig::NAME)
1870 );
1871 assert_eq!(
1872 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1873 &value!(UNNAMED_SOCKET_HOST)
1874 );
1875 })
1876 .await;
1877 }
1878
1879 #[cfg(unix)]
1880 #[tokio::test]
1881 async fn unix_stream_message_splits_on_newline() {
1882 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1883 let (_, rx) = unix_message("foo\nbar", true, false).await;
1884 let events = collect_n(rx, 2).await;
1885
1886 assert_eq!(events.len(), 2);
1887 assert_eq!(
1888 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1889 "foo".into()
1890 );
1891 assert_eq!(
1892 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1893 "socket".into()
1894 );
1895 assert_eq!(
1896 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1897 "bar".into()
1898 );
1899 assert_eq!(
1900 events[1].as_log()[log_schema().source_type_key().unwrap().to_string()],
1901 "socket".into()
1902 );
1903 })
1904 .await;
1905 }
1906
1907 #[cfg(unix)]
1908 #[tokio::test]
1909 async fn unix_stream_multiple_packets() {
1910 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1911 unix_multiple_packets(true).await
1912 })
1913 .await;
1914 }
1915
1916 #[cfg(unix)]
1917 #[test]
1918 fn parses_new_unix_stream_config() {
1919 let config = parses_unix_config("unix_stream");
1920 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1921 }
1922
1923 #[cfg(unix)]
1924 #[test]
1925 fn parses_new_unix_datagram_perms() {
1926 let config = parses_unix_config_file_mode("unix_stream");
1927 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1928 }
1929
1930 #[cfg(unix)]
1931 #[test]
1932 fn parses_old_unix_stream_config() {
1933 let config = parses_unix_config("unix");
1934 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1935 }
1936
1937 #[cfg(unix)]
1938 #[tokio::test]
1939 async fn unix_stream_permissions() {
1940 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1941 let (tx, _) = SourceSender::new_test();
1942
1943 let mut config = UnixConfig::new(in_path.clone());
1944 config.socket_file_mode = Some(0o421);
1945 let mode = Mode::UnixStream(config);
1946 let server = SocketConfig { mode }
1947 .build(SourceContext::new_test(tx, None))
1948 .await
1949 .unwrap();
1950 tokio::spawn(server);
1951
1952 wait_for(|| {
1953 match std::fs::metadata(&in_path) {
1954 Ok(meta) => {
1955 match meta.permissions().mode() {
1956 0o140421 => ready(true),
1958 _ => ready(false),
1959 }
1960 }
1961 Err(_) => ready(false),
1962 }
1963 })
1964 .await;
1965 }
1966}