1pub mod tcp;
2pub mod udp;
3#[cfg(unix)]
4mod unix;
5
6use vector_lib::{
7 codecs::decoding::DeserializerConfig,
8 config::{LegacyKey, LogNamespace, log_schema},
9 configurable::configurable_component,
10 lookup::{lookup_v2::OptionalValuePath, owned_value_path},
11};
12use vrl::value::{Kind, kind::Collection};
13
14use crate::{
15 codecs::DecodingConfig,
16 config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
17 sources::util::net::TcpSource,
18 tls::MaybeTlsSettings,
19};
20
21#[configurable_component(source("socket", "Collect logs over a socket."))]
23#[derive(Clone, Debug)]
24pub struct SocketConfig {
25 #[serde(flatten)]
26 pub mode: Mode,
27}
28
29#[configurable_component]
31#[derive(Clone, Debug)]
32#[serde(tag = "mode", rename_all = "snake_case")]
33#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
34#[allow(clippy::large_enum_variant)] pub enum Mode {
36 Tcp(tcp::TcpConfig),
38
39 Udp(udp::UdpConfig),
41
42 #[cfg(unix)]
44 UnixDatagram(unix::UnixConfig),
45
46 #[cfg(unix)]
48 #[serde(alias = "unix")]
49 UnixStream(unix::UnixConfig),
50}
51
52impl SocketConfig {
53 pub fn new_tcp(tcp_config: tcp::TcpConfig) -> Self {
54 tcp_config.into()
55 }
56
57 pub fn make_basic_tcp_config(addr: std::net::SocketAddr) -> Self {
58 tcp::TcpConfig::from_address(addr.into()).into()
59 }
60
61 fn decoding(&self) -> DeserializerConfig {
62 match &self.mode {
63 Mode::Tcp(config) => config.decoding().clone(),
64 Mode::Udp(config) => config.decoding().clone(),
65 #[cfg(unix)]
66 Mode::UnixDatagram(config) => config.decoding().clone(),
67 #[cfg(unix)]
68 Mode::UnixStream(config) => config.decoding().clone(),
69 }
70 }
71
72 fn log_namespace(&self, global_log_namespace: LogNamespace) -> LogNamespace {
73 match &self.mode {
74 Mode::Tcp(config) => global_log_namespace.merge(config.log_namespace),
75 Mode::Udp(config) => global_log_namespace.merge(config.log_namespace),
76 #[cfg(unix)]
77 Mode::UnixDatagram(config) => global_log_namespace.merge(config.log_namespace),
78 #[cfg(unix)]
79 Mode::UnixStream(config) => global_log_namespace.merge(config.log_namespace),
80 }
81 }
82}
83
84impl From<tcp::TcpConfig> for SocketConfig {
85 fn from(config: tcp::TcpConfig) -> Self {
86 SocketConfig {
87 mode: Mode::Tcp(config),
88 }
89 }
90}
91
92impl From<udp::UdpConfig> for SocketConfig {
93 fn from(config: udp::UdpConfig) -> Self {
94 SocketConfig {
95 mode: Mode::Udp(config),
96 }
97 }
98}
99
100impl GenerateConfig for SocketConfig {
101 fn generate_config() -> toml::Value {
102 toml::from_str(
103 r#"mode = "tcp"
104 address = "0.0.0.0:9000""#,
105 )
106 .unwrap()
107 }
108}
109
110#[async_trait::async_trait]
111#[typetag::serde(name = "socket")]
112impl SourceConfig for SocketConfig {
113 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
114 match self.mode.clone() {
115 Mode::Tcp(config) => {
116 let log_namespace = cx.log_namespace(config.log_namespace);
117
118 let decoding = config.decoding().clone();
119 let decoder = DecodingConfig::new(
120 config
121 .framing
122 .clone()
123 .unwrap_or_else(|| decoding.default_stream_framing()),
124 decoding,
125 log_namespace,
126 )
127 .build()?;
128
129 let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
130 let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
131 let tls_client_metadata_key = config
132 .tls()
133 .as_ref()
134 .and_then(|tls| tls.client_metadata_key.clone())
135 .and_then(|k| k.path);
136 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
137 tcp.run(
138 config.address(),
139 config.keepalive(),
140 config.shutdown_timeout_secs(),
141 tls,
142 tls_client_metadata_key,
143 config.receive_buffer_bytes(),
144 config.max_connection_duration_secs(),
145 cx,
146 false.into(),
147 config.connection_limit,
148 config.permit_origin.map(Into::into),
149 SocketConfig::NAME,
150 log_namespace,
151 )
152 }
153 Mode::Udp(config) => {
154 let log_namespace = cx.log_namespace(config.log_namespace);
155 let decoding = config.decoding().clone();
156 let framing = config
157 .framing()
158 .clone()
159 .unwrap_or_else(|| decoding.default_message_based_framing());
160 let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
161 Ok(udp::udp(
162 config,
163 decoder,
164 cx.shutdown,
165 cx.out,
166 log_namespace,
167 ))
168 }
169 #[cfg(unix)]
170 Mode::UnixDatagram(config) => {
171 let log_namespace = cx.log_namespace(config.log_namespace);
172 let decoding = config.decoding.clone();
173 let framing = config
174 .framing
175 .clone()
176 .unwrap_or_else(|| decoding.default_message_based_framing());
177 let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
178
179 unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace)
180 }
181 #[cfg(unix)]
182 Mode::UnixStream(config) => {
183 let log_namespace = cx.log_namespace(config.log_namespace);
184
185 let decoding = config.decoding().clone();
186 let decoder = DecodingConfig::new(
187 config
188 .framing
189 .clone()
190 .unwrap_or_else(|| decoding.default_stream_framing()),
191 decoding,
192 log_namespace,
193 )
194 .build()?;
195
196 unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
197 }
198 }
199 }
200
201 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
202 let log_namespace = self.log_namespace(global_log_namespace);
203
204 let schema_definition = self
205 .decoding()
206 .schema_definition(log_namespace)
207 .with_standard_vector_source_metadata();
208
209 let schema_definition = match &self.mode {
210 Mode::Tcp(config) => {
211 let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
212
213 let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
214
215 let tls_client_metadata_path = config
216 .tls()
217 .as_ref()
218 .and_then(|tls| tls.client_metadata_key.as_ref())
219 .and_then(|k| k.path.clone())
220 .map(LegacyKey::Overwrite);
221
222 schema_definition
223 .with_source_metadata(
224 Self::NAME,
225 legacy_host_key,
226 &owned_value_path!("host"),
227 Kind::bytes(),
228 Some("host"),
229 )
230 .with_source_metadata(
231 Self::NAME,
232 legacy_port_key,
233 &owned_value_path!("port"),
234 Kind::integer(),
235 None,
236 )
237 .with_source_metadata(
238 Self::NAME,
239 tls_client_metadata_path,
240 &owned_value_path!("tls_client_metadata"),
241 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
242 .or_undefined(),
243 None,
244 )
245 }
246 Mode::Udp(config) => {
247 let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
248
249 let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
250
251 schema_definition
252 .with_source_metadata(
253 Self::NAME,
254 legacy_host_key,
255 &owned_value_path!("host"),
256 Kind::bytes(),
257 None,
258 )
259 .with_source_metadata(
260 Self::NAME,
261 legacy_port_key,
262 &owned_value_path!("port"),
263 Kind::integer(),
264 None,
265 )
266 }
267 #[cfg(unix)]
268 Mode::UnixDatagram(config) => {
269 let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
270
271 schema_definition.with_source_metadata(
272 Self::NAME,
273 legacy_host_key,
274 &owned_value_path!("host"),
275 Kind::bytes(),
276 None,
277 )
278 }
279 #[cfg(unix)]
280 Mode::UnixStream(config) => {
281 let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
282
283 schema_definition.with_source_metadata(
284 Self::NAME,
285 legacy_host_key,
286 &owned_value_path!("host"),
287 Kind::bytes(),
288 None,
289 )
290 }
291 };
292
293 vec![SourceOutput::new_maybe_logs(
294 self.decoding().output_type(),
295 schema_definition,
296 )]
297 }
298
299 fn resources(&self) -> Vec<Resource> {
300 match self.mode.clone() {
301 Mode::Tcp(tcp) => vec![tcp.address().as_tcp_resource()],
302 Mode::Udp(udp) => vec![udp.address().as_udp_resource()],
303 #[cfg(unix)]
304 Mode::UnixDatagram(_) => vec![],
305 #[cfg(unix)]
306 Mode::UnixStream(_) => vec![],
307 }
308 }
309
310 fn can_acknowledge(&self) -> bool {
311 false
312 }
313}
314
315pub(crate) fn default_host_key() -> OptionalValuePath {
316 log_schema().host_key().cloned().into()
317}
318
319#[cfg(test)]
320mod test {
321 use std::{
322 collections::HashMap,
323 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
324 sync::{
325 Arc, LazyLock,
326 atomic::{AtomicBool, Ordering},
327 },
328 thread,
329 };
330
331 use approx::assert_relative_eq;
332 use bytes::{BufMut, Bytes, BytesMut};
333 use futures::{StreamExt, stream};
334 use portpicker::pick_unused_port;
335 use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
336 use serde_json::json;
337 use tokio::{
338 io::AsyncReadExt,
339 net::TcpStream,
340 sync::{Mutex, MutexGuard},
341 task::JoinHandle,
342 time::{Duration, Instant, timeout},
343 };
344 #[cfg(unix)]
345 use vector_lib::codecs::{
346 CharacterDelimitedDecoderConfig, decoding::CharacterDelimitedDecoderOptions,
347 };
348 use vector_lib::{
349 codecs::{GelfDeserializerConfig, NewlineDelimitedDecoderConfig},
350 event::EventContainer,
351 lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
352 };
353 use vrl::{btreemap, value, value::ObjectMap};
354 #[cfg(unix)]
355 use {
356 super::{Mode, unix::UnixConfig},
357 crate::sources::util::unix::UNNAMED_SOCKET_HOST,
358 crate::test_util::wait_for,
359 futures::{SinkExt, Stream},
360 std::future::ready,
361 std::os::unix::fs::PermissionsExt,
362 std::path::PathBuf,
363 tokio::{
364 io::AsyncWriteExt,
365 net::{UnixDatagram, UnixStream},
366 task::yield_now,
367 },
368 tokio_util::codec::{FramedWrite, LinesCodec},
369 };
370
371 use super::{SocketConfig, tcp::TcpConfig, udp::UdpConfig};
372 use crate::{
373 SourceSender,
374 config::{ComponentKey, GlobalOptions, SourceConfig, SourceContext, log_schema},
375 event::{Event, LogEvent},
376 shutdown::{ShutdownSignal, SourceShutdownCoordinator},
377 sinks::util::tcp::TcpSinkConfig,
378 sources::util::net::SocketListenAddr,
379 test_util::{
380 collect_n, collect_n_limited,
381 components::{
382 COMPONENT_ERROR_TAGS, SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance,
383 assert_source_error,
384 },
385 random_string, send_lines, send_lines_tls, wait_for_tcp,
386 },
387 tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
388 };
389
390 type Guard<'a> = MutexGuard<'a, ()>;
391
392 async fn wait_for_tcp_and_release<'a>(guard: Guard<'a>, addr: SocketAddr) {
393 wait_for_tcp(addr).await;
394 drop(guard) }
396
397 static ADDR_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
398 pub async fn next_addr_for_ip<'a>(ip: IpAddr) -> (Guard<'a>, SocketAddr) {
399 let guard = ADDR_LOCK.lock().await;
400 let port = pick_unused_port(ip);
401 (guard, SocketAddr::new(ip, port))
402 }
403
404 pub async fn next_addr_any<'a>() -> (Guard<'a>, SocketAddr) {
405 next_addr_for_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).await
406 }
407
408 pub async fn next_addr<'a>() -> (Guard<'a>, SocketAddr) {
409 next_addr_for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)).await
410 }
411
412 pub fn bind_unused_udp() -> UdpSocket {
413 portpicker::bind_unused_udp(IpAddr::V4(Ipv4Addr::LOCALHOST))
414 }
415
416 pub fn bind_unused_udp_any() -> UdpSocket {
417 portpicker::bind_unused_udp(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
418 }
419
420 fn get_gelf_payload(message: &str) -> String {
421 serde_json::to_string(&json!({
422 "version": "1.1",
423 "host": "example.org",
424 "short_message": message,
425 "timestamp": 1234567890.123,
426 "level": 6,
427 "_foo": "bar",
428 }))
429 .unwrap()
430 }
431
432 fn create_gelf_chunk(
433 message_id: u64,
434 sequence_number: u8,
435 total_chunks: u8,
436 payload: &[u8],
437 ) -> Bytes {
438 const GELF_MAGIC: [u8; 2] = [0x1e, 0x0f];
439 let mut chunk = BytesMut::new();
440 chunk.put_slice(&GELF_MAGIC);
441 chunk.put_u64(message_id);
442 chunk.put_u8(sequence_number);
443 chunk.put_u8(total_chunks);
444 chunk.put(payload);
445 chunk.freeze()
446 }
447
448 fn get_gelf_chunks(short_message: &str, max_size: usize, rng: &mut SmallRng) -> Vec<Bytes> {
449 let message_id = rand::random();
450 let payload = get_gelf_payload(short_message);
451 let payload_chunks = payload.as_bytes().chunks(max_size).collect::<Vec<_>>();
452 let total_chunks = payload_chunks.len();
453 assert!(total_chunks <= 128, "too many gelf chunks");
454
455 let mut chunks = payload_chunks
456 .into_iter()
457 .enumerate()
458 .map(|(i, payload_chunk)| {
459 create_gelf_chunk(message_id, i as u8, total_chunks as u8, payload_chunk)
460 })
461 .collect::<Vec<_>>();
462 chunks.shuffle(rng);
464 chunks
465 }
466
467 #[test]
468 fn generate_config() {
469 crate::test_util::test_generate_config::<SocketConfig>();
470 }
471
472 #[tokio::test]
474 async fn tcp_it_includes_host() {
475 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
476 let (tx, mut rx) = SourceSender::new_test();
477 let (guard, addr) = next_addr().await;
478
479 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
480 .build(SourceContext::new_test(tx, None))
481 .await
482 .unwrap();
483 tokio::spawn(server);
484
485 wait_for_tcp_and_release(guard, addr).await;
486
487 let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
488 .await
489 .unwrap();
490
491 let event = rx.next().await.unwrap();
492
493 assert_eq!(event.as_log()["host"], addr.ip().to_string().into());
494 assert_eq!(event.as_log()["port"], addr.port().into());
495 })
496 .await;
497 }
498
499 #[tokio::test]
500 async fn tcp_it_includes_vector_namespaced_fields() {
501 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
502 let (tx, mut rx) = SourceSender::new_test();
503 let (guard, addr) = next_addr().await;
504 let mut conf = TcpConfig::from_address(addr.into());
505 conf.set_log_namespace(Some(true));
506
507 let server = SocketConfig::from(conf)
508 .build(SourceContext::new_test(tx, None))
509 .await
510 .unwrap();
511 tokio::spawn(server);
512
513 wait_for_tcp_and_release(guard, addr).await;
514
515 let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
516 .await
517 .unwrap();
518
519 let event = rx.next().await.unwrap();
520 let log = event.as_log();
521 let event_meta = log.metadata().value();
522
523 assert_eq!(log.value(), &"test".into());
524 assert_eq!(
525 event_meta.get(path!("vector", "source_type")).unwrap(),
526 &value!(SocketConfig::NAME)
527 );
528 assert_eq!(
529 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
530 &value!(addr.ip().to_string())
531 );
532 assert_eq!(
533 event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
534 &value!(addr.port())
535 );
536 })
537 .await;
538 }
539
540 #[tokio::test]
541 async fn tcp_splits_on_newline() {
542 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
543 let (tx, rx) = SourceSender::new_test();
544 let (guard, addr) = next_addr().await;
545
546 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
547 .build(SourceContext::new_test(tx, None))
548 .await
549 .unwrap();
550 tokio::spawn(server);
551
552 wait_for_tcp_and_release(guard, addr).await;
553
554 send_lines(addr, vec!["foo\nbar".to_owned()].into_iter())
555 .await
556 .unwrap();
557
558 let events = collect_n(rx, 2).await;
559
560 assert_eq!(events.len(), 2);
561 assert_eq!(
562 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
563 "foo".into()
564 );
565 assert_eq!(
566 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
567 "bar".into()
568 );
569 })
570 .await;
571 }
572
573 #[tokio::test]
574 async fn tcp_it_includes_source_type() {
575 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
576 let (tx, mut rx) = SourceSender::new_test();
577 let (guard, addr) = next_addr().await;
578
579 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
580 .build(SourceContext::new_test(tx, None))
581 .await
582 .unwrap();
583 tokio::spawn(server);
584
585 wait_for_tcp_and_release(guard, addr).await;
586 send_lines(addr, vec!["test".to_owned()].into_iter())
587 .await
588 .unwrap();
589
590 let event = rx.next().await.unwrap();
591 assert_eq!(
592 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
593 "socket".into()
594 );
595 })
596 .await;
597 }
598
599 #[tokio::test]
600 async fn tcp_continue_after_long_line() {
601 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
602 let (tx, mut rx) = SourceSender::new_test();
603 let (guard, addr) = next_addr().await;
604
605 let mut config = TcpConfig::from_address(addr.into());
606 config.set_framing(Some(
607 NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
608 ));
609
610 let server = SocketConfig::from(config)
611 .build(SourceContext::new_test(tx, None))
612 .await
613 .unwrap();
614 tokio::spawn(server);
615
616 let lines = vec![
617 "short".to_owned(),
618 "this is too long".to_owned(),
619 "more short".to_owned(),
620 ];
621
622 wait_for_tcp_and_release(guard, addr).await;
623 send_lines(addr, lines.into_iter()).await.unwrap();
624
625 let event = rx.next().await.unwrap();
626 assert_eq!(
627 event.as_log()[log_schema().message_key().unwrap().to_string()],
628 "short".into()
629 );
630
631 let event = rx.next().await.unwrap();
632 assert_eq!(
633 event.as_log()[log_schema().message_key().unwrap().to_string()],
634 "more short".into()
635 );
636 })
637 .await;
638 }
639
640 #[tokio::test]
641 async fn tcp_with_tls() {
642 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
643 let (tx, mut rx) = SourceSender::new_test();
644 let (guard, addr) = next_addr().await;
645
646 let mut config = TcpConfig::from_address(addr.into());
647 config.set_tls(Some(TlsSourceConfig {
648 tls_config: TlsEnableableConfig {
649 enabled: Some(true),
650 options: TlsConfig {
651 verify_certificate: Some(true),
652 crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
653 key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
654 ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
655 ..Default::default()
656 },
657 },
658 client_metadata_key: Some(OptionalValuePath::from(owned_value_path!("tls_peer"))),
659 }));
660
661 let server = SocketConfig::from(config)
662 .build(SourceContext::new_test(tx, None))
663 .await
664 .unwrap();
665 tokio::spawn(server);
666
667 let lines = vec!["one line".to_owned(), "another line".to_owned()];
668
669 wait_for_tcp_and_release(guard, addr).await;
670 send_lines_tls(
671 addr,
672 "localhost".into(),
673 lines.into_iter(),
674 std::path::Path::new(tls::TEST_PEM_CA_PATH),
675 std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
676 std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
677 )
678 .await
679 .unwrap();
680
681 let event = rx.next().await.unwrap();
682 assert_eq!(
683 event.as_log()[log_schema().message_key().unwrap().to_string()],
684 "one line".into()
685 );
686
687 let tls_meta: ObjectMap = btreemap!(
688 "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
689 );
690
691 assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
692
693 let event = rx.next().await.unwrap();
694 assert_eq!(
695 event.as_log()[log_schema().message_key().unwrap().to_string()],
696 "another line".into()
697 );
698
699 assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
700 })
701 .await;
702 }
703
704 #[tokio::test]
705 async fn tcp_with_tls_vector_namespace() {
706 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
707 let (tx, mut rx) = SourceSender::new_test();
708 let (guard, addr) = next_addr().await;
709
710 let mut config = TcpConfig::from_address(addr.into());
711 config.set_tls(Some(TlsSourceConfig {
712 tls_config: TlsEnableableConfig {
713 enabled: Some(true),
714 options: TlsConfig {
715 verify_certificate: Some(true),
716 crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
717 key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
718 ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
719 ..Default::default()
720 },
721 },
722 client_metadata_key: None,
723 }));
724 config.log_namespace = Some(true);
725
726 let server = SocketConfig::from(config)
727 .build(SourceContext::new_test(tx, None))
728 .await
729 .unwrap();
730 tokio::spawn(server);
731
732 let lines = vec!["one line".to_owned(), "another line".to_owned()];
733
734 wait_for_tcp_and_release(guard, addr).await;
735 send_lines_tls(
736 addr,
737 "localhost".into(),
738 lines.into_iter(),
739 std::path::Path::new(tls::TEST_PEM_CA_PATH),
740 std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
741 std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
742 )
743 .await
744 .unwrap();
745
746 let event = rx.next().await.unwrap();
747 let log = event.as_log();
748 let event_meta = log.metadata().value();
749
750 assert_eq!(log.value(), &"one line".into());
751
752 let tls_meta: ObjectMap = btreemap!(
753 "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
754 );
755
756 assert_eq!(
757 event_meta
758 .get(path!(SocketConfig::NAME, "tls_client_metadata"))
759 .unwrap(),
760 &value!(tls_meta.clone())
761 );
762
763 let event = rx.next().await.unwrap();
764 let log = event.as_log();
765 let event_meta = log.metadata().value();
766
767 assert_eq!(log.value(), &"another line".into());
768
769 assert_eq!(
770 event_meta
771 .get(path!(SocketConfig::NAME, "tls_client_metadata"))
772 .unwrap(),
773 &value!(tls_meta.clone())
774 );
775 })
776 .await;
777 }
778
779 #[tokio::test]
780 async fn tcp_shutdown_simple() {
781 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
782 let source_id = ComponentKey::from("tcp_shutdown_simple");
783 let (tx, mut rx) = SourceSender::new_test();
784 let (guard, addr) = next_addr().await;
785 let (cx, mut shutdown) = SourceContext::new_shutdown(&source_id, tx);
786
787 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
789 .build(cx)
790 .await
791 .unwrap();
792 let source_handle = tokio::spawn(server);
793
794 wait_for_tcp_and_release(guard, addr).await;
796 send_lines(addr, vec!["test".to_owned()].into_iter())
797 .await
798 .unwrap();
799
800 let event = rx.next().await.unwrap();
801 assert_eq!(
802 event.as_log()[log_schema().message_key().unwrap().to_string()],
803 "test".into()
804 );
805
806 let deadline = Instant::now() + Duration::from_secs(10);
808 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
809 let shutdown_success = shutdown_complete.await;
810 assert!(shutdown_success);
811
812 _ = source_handle.await.unwrap();
814 })
815 .await;
816 }
817
818 #[tokio::test]
821 async fn tcp_shutdown_infinite_stream() {
822 let (guard, addr) = next_addr().await;
827
828 let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000);
829 let source_key = ComponentKey::from("tcp_shutdown_infinite_stream");
830 let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx);
831
832 let mut source_config = TcpConfig::from_address(addr.into());
833 source_config.set_shutdown_timeout_secs(1);
834 let source_task = SocketConfig::from(source_config)
835 .build(source_cx)
836 .await
837 .unwrap();
838
839 let source_handle = tokio::spawn(source_task);
841 wait_for_tcp_and_release(guard, addr).await;
842
843 let message = random_string(512);
847 let message_bytes = Bytes::from(message.clone());
848
849 #[derive(Clone, Debug)]
850 struct Serializer {
851 bytes: Bytes,
852 }
853 impl tokio_util::codec::Encoder<Event> for Serializer {
854 type Error = vector_lib::codecs::encoding::Error;
855
856 fn encode(&mut self, _: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
857 buffer.put(self.bytes.as_ref());
858 buffer.put_u8(b'\n');
859 Ok(())
860 }
861 }
862 let sink_config = TcpSinkConfig::from_address(format!("localhost:{}", addr.port()));
863 let encoder = Serializer {
864 bytes: message_bytes,
865 };
866 let (sink, _healthcheck) = sink_config.build(Default::default(), encoder).unwrap();
867
868 tokio::spawn(async move {
869 let input = stream::repeat_with(|| LogEvent::default().into()).boxed();
870 sink.run(input).await.unwrap();
871 });
872
873 let events = collect_n_limited(source_rx, 100)
877 .await
878 .into_iter()
879 .collect::<Vec<_>>();
880 assert_eq!(100, events.len());
881
882 let message_key = log_schema().message_key().unwrap().to_string();
883 let expected_message = message.clone().into();
884 for event in events.into_iter().flat_map(EventContainer::into_events) {
885 assert_eq!(event.as_log()[message_key.as_str()], expected_message);
886 }
887
888 let shutdown_timeout_limit = Duration::from_secs(10);
891 let deadline = Instant::now() + shutdown_timeout_limit;
892 let shutdown_complete = shutdown.shutdown_source(&source_key, deadline);
893
894 let shutdown_result = timeout(shutdown_timeout_limit, shutdown_complete).await;
895 assert_eq!(shutdown_result, Ok(true));
896
897 let source_result = source_handle.await.expect("source task should not panic");
898 assert_eq!(source_result, Ok(()));
899 }
900
901 #[tokio::test]
902 async fn tcp_connection_close_after_max_duration() {
903 let (tx, _) = SourceSender::new_test();
904 let (guard, addr) = next_addr().await;
905
906 let mut source_config = TcpConfig::from_address(addr.into());
907 source_config.set_max_connection_duration_secs(Some(1));
908 let source_task = SocketConfig::from(source_config)
909 .build(SourceContext::new_test(tx, None))
910 .await
911 .unwrap();
912
913 drop(tokio::spawn(source_task));
915 wait_for_tcp_and_release(guard, addr).await;
916
917 let mut stream: TcpStream = TcpStream::connect(addr)
918 .await
919 .expect("stream should be able to connect");
920 let start = Instant::now();
921
922 let timeout = tokio::time::sleep(Duration::from_millis(1200));
923 let mut buffer = [0u8; 10];
924
925 tokio::select! {
926 _ = timeout => {
927 panic!("timed out waiting for stream to close")
928 },
929 read_result = stream.read(&mut buffer) => {
930 match read_result {
931 Ok(0) => assert_relative_eq!(start.elapsed().as_secs_f64(), 1.0, epsilon = 0.3),
933 Ok(_) => panic!("unexpectedly read data from stream"),
934 Err(e) => panic!("{e:}")
935 }
936 }
937 }
938 }
939
940 async fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> UdpSocket {
942 send_lines_udp_from(bind_unused_udp(), to, lines)
943 }
944
945 fn send_lines_udp_from(
946 from: UdpSocket,
947 to: SocketAddr,
948 lines: impl IntoIterator<Item = String>,
949 ) -> UdpSocket {
950 send_packets_udp_from(from, to, lines.into_iter().map(|line| line.into()))
951 }
952
953 async fn send_packets_udp(
954 to: SocketAddr,
955 packets: impl IntoIterator<Item = Bytes>,
956 ) -> UdpSocket {
957 send_packets_udp_from(bind_unused_udp(), to, packets)
958 }
959
960 fn send_packets_udp_from(
961 from: UdpSocket,
962 to: SocketAddr,
963 packets: impl IntoIterator<Item = Bytes>,
964 ) -> UdpSocket {
965 for packet in packets {
966 assert_eq!(
967 from.send_to(&packet, to)
968 .map_err(|error| panic!("{error:}"))
969 .ok()
970 .unwrap(),
971 packet.len()
972 );
973 thread::sleep(Duration::from_millis(1));
975 }
976
977 thread::sleep(Duration::from_millis(10));
979
980 from
982 }
983
984 async fn init_udp_with_shutdown(
985 sender: SourceSender,
986 source_id: &ComponentKey,
987 shutdown: &mut SourceShutdownCoordinator,
988 ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
989 let (shutdown_signal, _) = shutdown.register_source(source_id, false);
990 init_udp_inner(sender, source_id, shutdown_signal, None, false).await
991 }
992
993 async fn init_udp(sender: SourceSender, use_log_namespace: bool) -> SocketAddr {
994 init_udp_inner(
995 sender,
996 &ComponentKey::from("default"),
997 ShutdownSignal::noop(),
998 None,
999 use_log_namespace,
1000 )
1001 .await
1002 .0
1003 }
1004
1005 async fn init_udp_with_config(sender: SourceSender, config: UdpConfig) -> SocketAddr {
1006 init_udp_inner(
1007 sender,
1008 &ComponentKey::from("default"),
1009 ShutdownSignal::noop(),
1010 Some(config),
1011 false,
1012 )
1013 .await
1014 .0
1015 }
1016
1017 async fn init_udp_inner(
1018 sender: SourceSender,
1019 source_key: &ComponentKey,
1020 shutdown_signal: ShutdownSignal,
1021 config: Option<UdpConfig>,
1022 use_vector_namespace: bool,
1023 ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
1024 let (guard, address, mut config) = match config {
1025 Some(config) => match config.address() {
1026 SocketListenAddr::SocketAddr(addr) => (None, addr, config),
1027 _ => panic!("listen address should not be systemd FD offset in tests"),
1028 },
1029 None => {
1030 let (guard, address) = next_addr().await;
1031 (
1032 Some(guard),
1033 address,
1034 UdpConfig::from_address(address.into()),
1035 )
1036 }
1037 };
1038
1039 let config = if use_vector_namespace {
1040 config.set_log_namespace(Some(true));
1041 config
1042 } else {
1043 config
1044 };
1045
1046 let server = SocketConfig::from(config)
1047 .build(SourceContext {
1048 key: source_key.clone(),
1049 globals: GlobalOptions::default(),
1050 enrichment_tables: Default::default(),
1051 shutdown: shutdown_signal,
1052 out: sender,
1053 proxy: Default::default(),
1054 acknowledgements: false,
1055 schema: Default::default(),
1056 schema_definitions: HashMap::default(),
1057 extra_context: Default::default(),
1058 })
1059 .await
1060 .unwrap();
1061 let source_handle = tokio::spawn(server);
1062
1063 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1065
1066 if let Some(guard) = guard {
1067 drop(guard)
1068 }
1069
1070 (address, source_handle)
1071 }
1072
1073 #[tokio::test]
1074 async fn udp_message() {
1075 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1076 let (tx, rx) = SourceSender::new_test();
1077 let address = init_udp(tx, false).await;
1078
1079 send_lines_udp(address, vec!["test".to_string()]).await;
1080 let events = collect_n(rx, 1).await;
1081
1082 assert_eq!(
1083 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1084 "test".into()
1085 );
1086 })
1087 .await;
1088 }
1089
1090 #[tokio::test]
1091 async fn udp_message_preserves_newline() {
1092 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1093 let (tx, rx) = SourceSender::new_test();
1094 let address = init_udp(tx, false).await;
1095
1096 send_lines_udp(address, vec!["foo\nbar".to_string()]).await;
1097 let events = collect_n(rx, 1).await;
1098
1099 assert_eq!(
1100 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1101 "foo\nbar".into()
1102 );
1103 })
1104 .await;
1105 }
1106
1107 #[tokio::test]
1108 async fn udp_multiple_packets() {
1109 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1110 let (tx, rx) = SourceSender::new_test();
1111 let address = init_udp(tx, false).await;
1112
1113 send_lines_udp(address, vec!["test".to_string(), "test2".to_string()]).await;
1114 let events = collect_n(rx, 2).await;
1115
1116 assert_eq!(
1117 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1118 "test".into()
1119 );
1120 assert_eq!(
1121 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1122 "test2".into()
1123 );
1124 })
1125 .await;
1126 }
1127
1128 #[tokio::test]
1129 async fn udp_max_length() {
1130 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1131 let (tx, rx) = SourceSender::new_test();
1132 let (_, address) = next_addr().await;
1133 let mut config = UdpConfig::from_address(address.into());
1134 config.max_length = 11;
1135 let address = init_udp_with_config(tx, config).await;
1136
1137 send_lines_udp(
1138 address,
1139 vec![
1140 "short line".to_string(),
1141 "test with a long line".to_string(),
1142 "a short un".to_string(),
1143 ],
1144 )
1145 .await;
1146
1147 let events = collect_n(rx, 2).await;
1148 assert_eq!(
1149 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1150 "short line".into()
1151 );
1152 assert_eq!(
1153 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1154 "a short un".into()
1155 );
1156 })
1157 .await;
1158 }
1159
1160 #[cfg(unix)]
1161 #[tokio::test]
1162 async fn udp_max_length_delimited() {
1167 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1168 let (tx, rx) = SourceSender::new_test();
1169 let (_, address) = next_addr().await;
1170 let mut config = UdpConfig::from_address(address.into());
1171 config.max_length = 10;
1172 config.framing = Some(
1173 CharacterDelimitedDecoderConfig {
1174 character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
1175 }
1176 .into(),
1177 );
1178 let address = init_udp_with_config(tx, config).await;
1179
1180 send_lines_udp(
1181 address,
1182 vec!["test with, long line".to_string(), "short one".to_string()],
1183 )
1184 .await;
1185
1186 let events = collect_n(rx, 2).await;
1187 assert_eq!(
1188 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1189 "test with".into()
1190 );
1191 assert_eq!(
1192 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1193 "short one".into()
1194 );
1195 })
1196 .await;
1197 }
1198
1199 #[tokio::test]
1200 async fn udp_decodes_chunked_gelf_messages() {
1201 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1202 let (tx, rx) = SourceSender::new_test();
1203 let (_, address) = next_addr().await;
1204 let mut config = UdpConfig::from_address(address.into());
1205 config.decoding = GelfDeserializerConfig::default().into();
1206 let address = init_udp_with_config(tx, config).await;
1207 let seed = 42;
1208 let mut rng = SmallRng::seed_from_u64(seed);
1209 let max_size = 300;
1210 let big_message = "This is a very large message".repeat(500);
1211 let another_big_message = "This is another very large message".repeat(500);
1212 let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1213 let mut another_chunks =
1214 get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1215 chunks.append(&mut another_chunks);
1216 chunks.shuffle(&mut rng);
1217
1218 send_packets_udp(address, chunks).await;
1219
1220 let events = collect_n(rx, 2).await;
1221 assert_eq!(
1222 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1223 big_message.into()
1224 );
1225 assert_eq!(
1226 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1227 another_big_message.into()
1228 );
1229 })
1230 .await;
1231 }
1232
1233 #[tokio::test]
1234 async fn udp_it_includes_host() {
1235 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1236 let (tx, rx) = SourceSender::new_test();
1237 let address = init_udp(tx, false).await;
1238
1239 let from = send_lines_udp(address, vec!["test".to_string()]).await;
1240 let events = collect_n(rx, 1).await;
1241
1242 assert_eq!(
1243 events[0].as_log()["host"],
1244 from.local_addr().unwrap().ip().to_string().into()
1245 );
1246 assert_eq!(
1247 events[0].as_log()["port"],
1248 from.local_addr().unwrap().port().into()
1249 );
1250 })
1251 .await;
1252 }
1253
1254 #[tokio::test]
1255 async fn udp_it_includes_vector_namespaced_fields() {
1256 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1257 let (tx, rx) = SourceSender::new_test();
1258 let address = init_udp(tx, true).await;
1259
1260 let from = send_lines_udp(address, vec!["test".to_string()]).await;
1261 let events = collect_n(rx, 1).await;
1262 let log = events[0].as_log();
1263 let event_meta = log.metadata().value();
1264
1265 assert_eq!(log.value(), &"test".into());
1266 assert_eq!(
1267 event_meta.get(path!("vector", "source_type")).unwrap(),
1268 &value!(SocketConfig::NAME)
1269 );
1270 assert_eq!(
1271 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1272 &value!(from.local_addr().unwrap().ip().to_string())
1273 );
1274 assert_eq!(
1275 event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
1276 &value!(from.local_addr().unwrap().port())
1277 );
1278 })
1279 .await;
1280 }
1281
1282 #[tokio::test]
1283 async fn udp_it_includes_source_type() {
1284 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1285 let (tx, rx) = SourceSender::new_test();
1286 let address = init_udp(tx, false).await;
1287
1288 _ = send_lines_udp(address, vec!["test".to_string()]).await;
1289 let events = collect_n(rx, 1).await;
1290
1291 assert_eq!(
1292 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1293 "socket".into()
1294 );
1295 })
1296 .await;
1297 }
1298
1299 #[tokio::test]
1300 async fn udp_shutdown_simple() {
1301 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1302 let (tx, rx) = SourceSender::new_test();
1303 let source_id = ComponentKey::from("udp_shutdown_simple");
1304
1305 let mut shutdown = SourceShutdownCoordinator::default();
1306 let (address, source_handle) =
1307 init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1308
1309 send_lines_udp(address, vec!["test".to_string()]).await;
1310 let events = collect_n(rx, 1).await;
1311
1312 assert_eq!(
1313 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1314 "test".into()
1315 );
1316
1317 let deadline = Instant::now() + Duration::from_secs(10);
1319 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1320 let shutdown_success = shutdown_complete.await;
1321 assert!(shutdown_success);
1322
1323 _ = source_handle.await.unwrap();
1325 })
1326 .await;
1327 }
1328
1329 #[tokio::test]
1330 async fn udp_shutdown_infinite_stream() {
1331 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1332 let (tx, rx) = SourceSender::new_test();
1333 let source_id = ComponentKey::from("udp_shutdown_infinite_stream");
1334
1335 let mut shutdown = SourceShutdownCoordinator::default();
1336 let (address, source_handle) =
1337 init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1338
1339 let run_pump_atomic_sender = Arc::new(AtomicBool::new(true));
1341 let run_pump_atomic_receiver = Arc::clone(&run_pump_atomic_sender);
1342 let pump_handle = tokio::task::spawn_blocking(move || {
1343 let handle = tokio::runtime::Handle::current();
1344 handle.block_on(send_lines_udp(
1345 address,
1346 std::iter::repeat("test".to_string())
1347 .take_while(move |_| run_pump_atomic_receiver.load(Ordering::Relaxed)),
1348 ));
1349 });
1350
1351 let events = collect_n(rx, 100).await;
1353 assert_eq!(100, events.len());
1354 for event in events {
1355 assert_eq!(
1356 event.as_log()[log_schema().message_key().unwrap().to_string()],
1357 "test".into()
1358 );
1359 }
1360
1361 let deadline = Instant::now() + Duration::from_secs(10);
1362 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1363 let shutdown_success = shutdown_complete.await;
1364 assert!(shutdown_success);
1365
1366 _ = source_handle.await.unwrap();
1368
1369 run_pump_atomic_sender.store(false, Ordering::Relaxed);
1371 assert!(pump_handle.await.is_ok());
1372 })
1373 .await;
1374 }
1375
1376 #[tokio::test]
1377 async fn multicast_udp_message() {
1378 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1379 let (tx, mut rx) = SourceSender::new_test();
1380 let (guard, socket_address) = next_addr_any().await;
1382 let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1383 let multicast_socket_address =
1384 SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1385 let mut config = UdpConfig::from_address(socket_address.into());
1386 config.multicast_groups = vec![multicast_ip_address];
1387 init_udp_with_config(tx, config).await;
1388 drop(guard);
1389
1390 send_lines_udp_from(
1394 bind_unused_udp_any(),
1395 multicast_socket_address,
1396 ["test".to_string()],
1397 );
1398
1399 let event = rx.next().await.expect("must receive an event");
1400 assert_eq!(
1401 event.as_log()[log_schema().message_key().unwrap().to_string()],
1402 "test".into()
1403 );
1404 })
1405 .await;
1406 }
1407
1408 #[tokio::test]
1409 async fn multiple_multicast_addresses_udp_message() {
1410 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1411 let (tx, mut rx) = SourceSender::new_test();
1412 let (guard, socket_address) = next_addr_any().await;
1413 let multicast_ip_addresses = (2..12)
1414 .map(|i| format!("224.0.0.{i}").parse().unwrap())
1415 .collect::<Vec<Ipv4Addr>>();
1416 let multicast_ip_socket_addresses = multicast_ip_addresses
1417 .iter()
1418 .map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
1419 .collect::<Vec<SocketAddr>>();
1420 let mut config = UdpConfig::from_address(socket_address.into());
1421 config.multicast_groups = multicast_ip_addresses;
1422 init_udp_with_config(tx, config).await;
1423 drop(guard);
1424
1425 let mut from = bind_unused_udp_any();
1426 for multicast_ip_socket_address in multicast_ip_socket_addresses {
1427 from = send_lines_udp_from(
1428 from,
1429 multicast_ip_socket_address,
1430 [multicast_ip_socket_address.to_string()],
1431 );
1432
1433 let event = rx.next().await.expect("must receive an event");
1434 assert_eq!(
1435 event.as_log()[log_schema().message_key().unwrap().to_string()],
1436 multicast_ip_socket_address.to_string().into()
1437 );
1438 }
1439 })
1440 .await;
1441 }
1442
1443 #[tokio::test]
1444 async fn multicast_and_unicast_udp_message() {
1445 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1446 let (tx, mut rx) = SourceSender::new_test();
1447 let (guard, socket_address) = next_addr_any().await;
1448 let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1449 let multicast_socket_address =
1450 SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1451 let mut config = UdpConfig::from_address(socket_address.into());
1452 config.multicast_groups = vec![multicast_ip_address];
1453 init_udp_with_config(tx, config).await;
1454 drop(guard);
1455
1456 let _ = send_lines_udp_from(
1458 bind_unused_udp_any(),
1459 multicast_socket_address,
1460 ["test".to_string()],
1461 );
1462 let event = rx.next().await.expect("must receive an event");
1463 assert_eq!(
1464 event.as_log()[log_schema().message_key().unwrap().to_string()],
1465 "test".into()
1466 );
1467
1468 let to = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket_address.port());
1471 send_lines_udp_from(bind_unused_udp(), to, ["test".to_string()]);
1475 let event = rx.next().await.expect("must receive an event");
1476 assert_eq!(
1477 event.as_log()[log_schema().message_key().unwrap().to_string()],
1478 "test".into()
1479 );
1480 })
1481 .await;
1482 }
1483
1484 #[tokio::test]
1485 async fn udp_invalid_multicast_group() {
1486 assert_source_error(&COMPONENT_ERROR_TAGS, async {
1487 let (tx, _rx) = SourceSender::new_test();
1488 let (_, socket_address) = next_addr_any().await;
1489 let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
1490 let mut config = UdpConfig::from_address(socket_address.into());
1491 config.multicast_groups = vec![invalid_multicast_ip_address];
1492 init_udp_with_config(tx, config).await;
1493 })
1494 .await;
1495 }
1496
1497 #[cfg(unix)]
1500 async fn init_unix(sender: SourceSender, stream: bool, use_vector_namespace: bool) -> PathBuf {
1501 init_unix_inner(sender, stream, use_vector_namespace, None).await
1502 }
1503
1504 #[cfg(unix)]
1505 async fn init_unix_with_config(
1506 sender: SourceSender,
1507 stream: bool,
1508 use_vector_namespace: bool,
1509 config: UnixConfig,
1510 ) -> PathBuf {
1511 init_unix_inner(sender, stream, use_vector_namespace, Some(config)).await
1512 }
1513
1514 #[cfg(unix)]
1515 async fn init_unix_inner(
1516 sender: SourceSender,
1517 stream: bool,
1518 use_vector_namespace: bool,
1519 config: Option<UnixConfig>,
1520 ) -> PathBuf {
1521 let mut config = config.unwrap_or_else(|| {
1522 UnixConfig::new(tempfile::tempdir().unwrap().keep().join("unix_test"))
1523 });
1524
1525 let in_path = config.path.clone();
1526
1527 if use_vector_namespace {
1528 config.log_namespace = Some(true);
1529 }
1530
1531 let mode = if stream {
1532 Mode::UnixStream(config)
1533 } else {
1534 Mode::UnixDatagram(config)
1535 };
1536
1537 let server = SocketConfig { mode }
1538 .build(SourceContext::new_test(sender, None))
1539 .await
1540 .unwrap();
1541 tokio::spawn(server);
1542
1543 while if stream {
1545 std::os::unix::net::UnixStream::connect(&in_path).is_err()
1546 } else {
1547 let socket = std::os::unix::net::UnixDatagram::unbound().unwrap();
1548 socket.connect(&in_path).is_err()
1549 } {
1550 yield_now().await;
1551 }
1552
1553 in_path
1554 }
1555
1556 #[cfg(unix)]
1557 async fn unix_send_lines(stream: bool, path: PathBuf, lines: &[&str]) {
1558 match stream {
1559 false => send_lines_unix_datagram(path, lines).await,
1560 true => send_lines_unix_stream(path, lines).await,
1561 }
1562 }
1563
1564 #[cfg(unix)]
1565 async fn unix_message(
1566 message: &str,
1567 stream: bool,
1568 use_vector_namespace: bool,
1569 ) -> (PathBuf, impl Stream<Item = Event> + use<>) {
1570 let (tx, rx) = SourceSender::new_test();
1571 let path = init_unix(tx, stream, use_vector_namespace).await;
1572 let path_clone = path.clone();
1573
1574 unix_send_lines(stream, path, &[message]).await;
1575
1576 (path_clone, rx)
1577 }
1578
1579 #[cfg(unix)]
1580 async fn unix_multiple_packets(stream: bool) {
1581 let (tx, rx) = SourceSender::new_test();
1582 let path = init_unix(tx, stream, false).await;
1583
1584 unix_send_lines(stream, path, &["test", "test2"]).await;
1585 let events = collect_n(rx, 2).await;
1586
1587 assert_eq!(2, events.len());
1588 assert_eq!(
1589 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1590 "test".into()
1591 );
1592 assert_eq!(
1593 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1594 "test2".into()
1595 );
1596 }
1597
1598 #[cfg(unix)]
1599 fn parses_unix_config(mode: &str) -> SocketConfig {
1600 toml::from_str::<SocketConfig>(&format!(
1601 r#"
1602 mode = "{mode}"
1603 path = "/does/not/exist"
1604 "#
1605 ))
1606 .unwrap()
1607 }
1608
1609 #[cfg(unix)]
1610 fn parses_unix_config_file_mode(mode: &str) -> SocketConfig {
1611 toml::from_str::<SocketConfig>(&format!(
1612 r#"
1613 mode = "{mode}"
1614 path = "/does/not/exist"
1615 socket_file_mode = 0o777
1616 "#
1617 ))
1618 .unwrap()
1619 }
1620
1621 #[cfg(unix)]
1623 async fn send_lines_unix_datagram(path: PathBuf, lines: &[&str]) {
1624 let packets = lines.iter().map(|line| Bytes::from(line.to_string()));
1625 send_packets_unix_datagram(path, packets).await;
1626 }
1627
1628 #[cfg(unix)]
1629 async fn send_packets_unix_datagram(path: PathBuf, packets: impl IntoIterator<Item = Bytes>) {
1630 let socket = UnixDatagram::unbound().unwrap();
1631 socket.connect(path).unwrap();
1632
1633 for packet in packets {
1634 socket.send(&packet).await.unwrap();
1635 }
1636 socket.shutdown(std::net::Shutdown::Both).unwrap();
1637 }
1638
1639 #[cfg(unix)]
1640 #[tokio::test]
1641 async fn unix_datagram_message() {
1642 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1643 let (_, rx) = unix_message("test", false, false).await;
1644 let events = collect_n(rx, 1).await;
1645
1646 assert_eq!(events.len(), 1);
1647 assert_eq!(
1648 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1649 "test".into()
1650 );
1651 assert_eq!(
1652 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1653 "socket".into()
1654 );
1655 assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into());
1656 })
1657 .await;
1658 }
1659
1660 #[ignore]
1661 #[cfg(unix)]
1662 #[tokio::test]
1663 async fn unix_datagram_socket_test() {
1664 use tempfile::tempdir;
1668 use tokio::net::UnixDatagram;
1669
1670 let tmp = tempdir().unwrap();
1671
1672 let tx_path = tmp.path().join("tx");
1673
1674 let tx_type = "bound";
1677
1678 let tx = if tx_type == "bound" {
1679 UnixDatagram::bind(&tx_path).unwrap()
1680 } else {
1681 UnixDatagram::unbound().unwrap()
1682 };
1683
1684 let rx_path = tmp.path().join("rx");
1690 let rx = UnixDatagram::bind(&rx_path).unwrap();
1691
1692 tx.connect(&rx_path).unwrap();
1694
1695 let bytes = b"hello world";
1697 tx.send(bytes).await.unwrap();
1698
1699 let mut buf = vec![0u8; 24];
1700 let (size, _) = rx.recv_from(&mut buf).await.unwrap();
1701
1702 let dgram = &buf[..size];
1703 assert_eq!(dgram, bytes);
1704 }
1705
1706 #[cfg(unix)]
1707 #[tokio::test]
1708 async fn unix_datagram_chunked_gelf_messages() {
1709 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1710 let (tx, rx) = SourceSender::new_test();
1711 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1712 let mut config = UnixConfig::new(in_path.clone());
1713 config.decoding = GelfDeserializerConfig::default().into();
1714 let path = init_unix_with_config(tx, false, false, config).await;
1715 let seed = 42;
1716 let mut rng = SmallRng::seed_from_u64(seed);
1717 let max_size = 20;
1718 let big_message = "This is a very large message".repeat(5);
1719 let another_big_message = "This is another very large message".repeat(5);
1720 let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1721 let mut another_chunks =
1722 get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1723 chunks.append(&mut another_chunks);
1724 chunks.shuffle(&mut rng);
1725
1726 send_packets_unix_datagram(path, chunks).await;
1727
1728 let events = collect_n(rx, 2).await;
1729 assert_eq!(
1730 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1731 big_message.into()
1732 );
1733 assert_eq!(
1734 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1735 another_big_message.into()
1736 );
1737 })
1738 .await;
1739 }
1740
1741 #[cfg(unix)]
1742 #[tokio::test]
1743 async fn unix_datagram_message_with_vector_namespace() {
1744 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1745 let (_, rx) = unix_message("test", false, true).await;
1746 let events = collect_n(rx, 1).await;
1747 let log = events[0].as_log();
1748 let event_meta = log.metadata().value();
1749
1750 assert_eq!(log.value(), &"test".into());
1751 assert_eq!(events.len(), 1);
1752
1753 assert_eq!(
1754 event_meta.get(path!("vector", "source_type")).unwrap(),
1755 &value!(SocketConfig::NAME)
1756 );
1757
1758 assert_eq!(
1759 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1760 &value!(UNNAMED_SOCKET_HOST)
1761 );
1762 })
1763 .await;
1764 }
1765
1766 #[cfg(unix)]
1767 #[tokio::test]
1768 async fn unix_datagram_message_preserves_newline() {
1769 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1770 let (_, rx) = unix_message("foo\nbar", false, false).await;
1771 let events = collect_n(rx, 1).await;
1772
1773 assert_eq!(events.len(), 1);
1774 assert_eq!(
1775 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1776 "foo\nbar".into()
1777 );
1778 assert_eq!(
1779 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1780 "socket".into()
1781 );
1782 })
1783 .await;
1784 }
1785
1786 #[cfg(unix)]
1787 #[tokio::test]
1788 async fn unix_datagram_multiple_packets() {
1789 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1790 unix_multiple_packets(false).await
1791 })
1792 .await;
1793 }
1794
1795 #[cfg(unix)]
1796 #[test]
1797 fn parses_unix_datagram_config() {
1798 let config = parses_unix_config("unix_datagram");
1799 assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1800 }
1801
1802 #[cfg(unix)]
1803 #[test]
1804 fn parses_unix_datagram_perms() {
1805 let config = parses_unix_config_file_mode("unix_datagram");
1806 assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1807 }
1808
1809 #[cfg(unix)]
1810 #[tokio::test]
1811 async fn unix_datagram_permissions() {
1812 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1813 let (tx, _) = SourceSender::new_test();
1814
1815 let mut config = UnixConfig::new(in_path.clone());
1816 config.socket_file_mode = Some(0o555);
1817 let mode = Mode::UnixDatagram(config);
1818 let server = SocketConfig { mode }
1819 .build(SourceContext::new_test(tx, None))
1820 .await
1821 .unwrap();
1822 tokio::spawn(server);
1823
1824 wait_for(|| {
1825 match std::fs::metadata(&in_path) {
1826 Ok(meta) => {
1827 match meta.permissions().mode() {
1828 0o140555 => ready(true),
1830 _ => ready(false),
1831 }
1832 }
1833 Err(_) => ready(false),
1834 }
1835 })
1836 .await;
1837 }
1838
1839 #[cfg(unix)]
1841 async fn send_lines_unix_stream(path: PathBuf, lines: &[&str]) {
1842 let socket = UnixStream::connect(path).await.unwrap();
1843 let mut sink = FramedWrite::new(socket, LinesCodec::new());
1844
1845 let lines = lines.iter().map(|s| Ok(s.to_string()));
1846 let lines = lines.collect::<Vec<_>>();
1847 sink.send_all(&mut stream::iter(lines)).await.unwrap();
1848
1849 let mut socket = sink.into_inner();
1850 socket.shutdown().await.unwrap();
1851 }
1852
1853 #[cfg(unix)]
1854 #[tokio::test]
1855 async fn unix_stream_message() {
1856 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1857 let (_, rx) = unix_message("test", true, false).await;
1858 let events = collect_n(rx, 1).await;
1859
1860 assert_eq!(1, events.len());
1861 assert_eq!(
1862 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1863 "test".into()
1864 );
1865 assert_eq!(
1866 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1867 "socket".into()
1868 );
1869 })
1870 .await;
1871 }
1872
1873 #[cfg(unix)]
1874 #[tokio::test]
1875 async fn unix_stream_message_with_vector_namespace() {
1876 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1877 let (_, rx) = unix_message("test", true, true).await;
1878 let events = collect_n(rx, 1).await;
1879 let log = events[0].as_log();
1880 let event_meta = log.metadata().value();
1881
1882 assert_eq!(log.value(), &"test".into());
1883 assert_eq!(1, events.len());
1884 assert_eq!(
1885 event_meta.get(path!("vector", "source_type")).unwrap(),
1886 &value!(SocketConfig::NAME)
1887 );
1888 assert_eq!(
1889 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1890 &value!(UNNAMED_SOCKET_HOST)
1891 );
1892 })
1893 .await;
1894 }
1895
1896 #[cfg(unix)]
1897 #[tokio::test]
1898 async fn unix_stream_message_splits_on_newline() {
1899 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1900 let (_, rx) = unix_message("foo\nbar", true, false).await;
1901 let events = collect_n(rx, 2).await;
1902
1903 assert_eq!(events.len(), 2);
1904 assert_eq!(
1905 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1906 "foo".into()
1907 );
1908 assert_eq!(
1909 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1910 "socket".into()
1911 );
1912 assert_eq!(
1913 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1914 "bar".into()
1915 );
1916 assert_eq!(
1917 events[1].as_log()[log_schema().source_type_key().unwrap().to_string()],
1918 "socket".into()
1919 );
1920 })
1921 .await;
1922 }
1923
1924 #[cfg(unix)]
1925 #[tokio::test]
1926 async fn unix_stream_multiple_packets() {
1927 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1928 unix_multiple_packets(true).await
1929 })
1930 .await;
1931 }
1932
1933 #[cfg(unix)]
1934 #[test]
1935 fn parses_new_unix_stream_config() {
1936 let config = parses_unix_config("unix_stream");
1937 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1938 }
1939
1940 #[cfg(unix)]
1941 #[test]
1942 fn parses_new_unix_datagram_perms() {
1943 let config = parses_unix_config_file_mode("unix_stream");
1944 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1945 }
1946
1947 #[cfg(unix)]
1948 #[test]
1949 fn parses_old_unix_stream_config() {
1950 let config = parses_unix_config("unix");
1951 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1952 }
1953
1954 #[cfg(unix)]
1955 #[tokio::test]
1956 async fn unix_stream_permissions() {
1957 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1958 let (tx, _) = SourceSender::new_test();
1959
1960 let mut config = UnixConfig::new(in_path.clone());
1961 config.socket_file_mode = Some(0o421);
1962 let mode = Mode::UnixStream(config);
1963 let server = SocketConfig { mode }
1964 .build(SourceContext::new_test(tx, None))
1965 .await
1966 .unwrap();
1967 tokio::spawn(server);
1968
1969 wait_for(|| {
1970 match std::fs::metadata(&in_path) {
1971 Ok(meta) => {
1972 match meta.permissions().mode() {
1973 0o140421 => ready(true),
1975 _ => ready(false),
1976 }
1977 }
1978 Err(_) => ready(false),
1979 }
1980 })
1981 .await;
1982 }
1983}