1pub mod tcp;
2pub mod udp;
3#[cfg(unix)]
4mod unix;
5
6use vector_lib::codecs::decoding::DeserializerConfig;
7use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
8use vector_lib::configurable::configurable_component;
9use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
10use vrl::value::{kind::Collection, Kind};
11
12use crate::{
13 codecs::DecodingConfig,
14 config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
15 sources::util::net::TcpSource,
16 tls::MaybeTlsSettings,
17};
18
19#[configurable_component(source("socket", "Collect logs over a socket."))]
21#[derive(Clone, Debug)]
22pub struct SocketConfig {
23 #[serde(flatten)]
24 pub mode: Mode,
25}
26
27#[configurable_component]
29#[derive(Clone, Debug)]
30#[serde(tag = "mode", rename_all = "snake_case")]
31#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
32#[allow(clippy::large_enum_variant)] pub enum Mode {
34 Tcp(tcp::TcpConfig),
36
37 Udp(udp::UdpConfig),
39
40 #[cfg(unix)]
42 UnixDatagram(unix::UnixConfig),
43
44 #[cfg(unix)]
46 #[serde(alias = "unix")]
47 UnixStream(unix::UnixConfig),
48}
49
50impl SocketConfig {
51 pub fn new_tcp(tcp_config: tcp::TcpConfig) -> Self {
52 tcp_config.into()
53 }
54
55 pub fn make_basic_tcp_config(addr: std::net::SocketAddr) -> Self {
56 tcp::TcpConfig::from_address(addr.into()).into()
57 }
58
59 fn decoding(&self) -> DeserializerConfig {
60 match &self.mode {
61 Mode::Tcp(config) => config.decoding().clone(),
62 Mode::Udp(config) => config.decoding().clone(),
63 #[cfg(unix)]
64 Mode::UnixDatagram(config) => config.decoding().clone(),
65 #[cfg(unix)]
66 Mode::UnixStream(config) => config.decoding().clone(),
67 }
68 }
69
70 fn log_namespace(&self, global_log_namespace: LogNamespace) -> LogNamespace {
71 match &self.mode {
72 Mode::Tcp(config) => global_log_namespace.merge(config.log_namespace),
73 Mode::Udp(config) => global_log_namespace.merge(config.log_namespace),
74 #[cfg(unix)]
75 Mode::UnixDatagram(config) => global_log_namespace.merge(config.log_namespace),
76 #[cfg(unix)]
77 Mode::UnixStream(config) => global_log_namespace.merge(config.log_namespace),
78 }
79 }
80}
81
82impl From<tcp::TcpConfig> for SocketConfig {
83 fn from(config: tcp::TcpConfig) -> Self {
84 SocketConfig {
85 mode: Mode::Tcp(config),
86 }
87 }
88}
89
90impl From<udp::UdpConfig> for SocketConfig {
91 fn from(config: udp::UdpConfig) -> Self {
92 SocketConfig {
93 mode: Mode::Udp(config),
94 }
95 }
96}
97
98impl GenerateConfig for SocketConfig {
99 fn generate_config() -> toml::Value {
100 toml::from_str(
101 r#"mode = "tcp"
102 address = "0.0.0.0:9000""#,
103 )
104 .unwrap()
105 }
106}
107
108#[async_trait::async_trait]
109#[typetag::serde(name = "socket")]
110impl SourceConfig for SocketConfig {
111 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
112 match self.mode.clone() {
113 Mode::Tcp(config) => {
114 let log_namespace = cx.log_namespace(config.log_namespace);
115
116 let decoding = config.decoding().clone();
117 let decoder = DecodingConfig::new(
118 config
119 .framing
120 .clone()
121 .unwrap_or_else(|| decoding.default_stream_framing()),
122 decoding,
123 log_namespace,
124 )
125 .build()?;
126
127 let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
128 let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
129 let tls_client_metadata_key = config
130 .tls()
131 .as_ref()
132 .and_then(|tls| tls.client_metadata_key.clone())
133 .and_then(|k| k.path);
134 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
135 tcp.run(
136 config.address(),
137 config.keepalive(),
138 config.shutdown_timeout_secs(),
139 tls,
140 tls_client_metadata_key,
141 config.receive_buffer_bytes(),
142 config.max_connection_duration_secs(),
143 cx,
144 false.into(),
145 config.connection_limit,
146 config.permit_origin.map(Into::into),
147 SocketConfig::NAME,
148 log_namespace,
149 )
150 }
151 Mode::Udp(config) => {
152 let log_namespace = cx.log_namespace(config.log_namespace);
153 let decoding = config.decoding().clone();
154 let framing = config
155 .framing()
156 .clone()
157 .unwrap_or_else(|| decoding.default_message_based_framing());
158 let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
159 Ok(udp::udp(
160 config,
161 decoder,
162 cx.shutdown,
163 cx.out,
164 log_namespace,
165 ))
166 }
167 #[cfg(unix)]
168 Mode::UnixDatagram(config) => {
169 let log_namespace = cx.log_namespace(config.log_namespace);
170 let decoding = config.decoding.clone();
171 let framing = config
172 .framing
173 .clone()
174 .unwrap_or_else(|| decoding.default_message_based_framing());
175 let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
176
177 unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace)
178 }
179 #[cfg(unix)]
180 Mode::UnixStream(config) => {
181 let log_namespace = cx.log_namespace(config.log_namespace);
182
183 let decoding = config.decoding().clone();
184 let decoder = DecodingConfig::new(
185 config
186 .framing
187 .clone()
188 .unwrap_or_else(|| decoding.default_stream_framing()),
189 decoding,
190 log_namespace,
191 )
192 .build()?;
193
194 unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
195 }
196 }
197 }
198
199 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
200 let log_namespace = self.log_namespace(global_log_namespace);
201
202 let schema_definition = self
203 .decoding()
204 .schema_definition(log_namespace)
205 .with_standard_vector_source_metadata();
206
207 let schema_definition = match &self.mode {
208 Mode::Tcp(config) => {
209 let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
210
211 let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
212
213 let tls_client_metadata_path = config
214 .tls()
215 .as_ref()
216 .and_then(|tls| tls.client_metadata_key.as_ref())
217 .and_then(|k| k.path.clone())
218 .map(LegacyKey::Overwrite);
219
220 schema_definition
221 .with_source_metadata(
222 Self::NAME,
223 legacy_host_key,
224 &owned_value_path!("host"),
225 Kind::bytes(),
226 Some("host"),
227 )
228 .with_source_metadata(
229 Self::NAME,
230 legacy_port_key,
231 &owned_value_path!("port"),
232 Kind::integer(),
233 None,
234 )
235 .with_source_metadata(
236 Self::NAME,
237 tls_client_metadata_path,
238 &owned_value_path!("tls_client_metadata"),
239 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
240 .or_undefined(),
241 None,
242 )
243 }
244 Mode::Udp(config) => {
245 let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
246
247 let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
248
249 schema_definition
250 .with_source_metadata(
251 Self::NAME,
252 legacy_host_key,
253 &owned_value_path!("host"),
254 Kind::bytes(),
255 None,
256 )
257 .with_source_metadata(
258 Self::NAME,
259 legacy_port_key,
260 &owned_value_path!("port"),
261 Kind::integer(),
262 None,
263 )
264 }
265 #[cfg(unix)]
266 Mode::UnixDatagram(config) => {
267 let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
268
269 schema_definition.with_source_metadata(
270 Self::NAME,
271 legacy_host_key,
272 &owned_value_path!("host"),
273 Kind::bytes(),
274 None,
275 )
276 }
277 #[cfg(unix)]
278 Mode::UnixStream(config) => {
279 let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
280
281 schema_definition.with_source_metadata(
282 Self::NAME,
283 legacy_host_key,
284 &owned_value_path!("host"),
285 Kind::bytes(),
286 None,
287 )
288 }
289 };
290
291 vec![SourceOutput::new_maybe_logs(
292 self.decoding().output_type(),
293 schema_definition,
294 )]
295 }
296
297 fn resources(&self) -> Vec<Resource> {
298 match self.mode.clone() {
299 Mode::Tcp(tcp) => vec![tcp.address().as_tcp_resource()],
300 Mode::Udp(udp) => vec![udp.address().as_udp_resource()],
301 #[cfg(unix)]
302 Mode::UnixDatagram(_) => vec![],
303 #[cfg(unix)]
304 Mode::UnixStream(_) => vec![],
305 }
306 }
307
308 fn can_acknowledge(&self) -> bool {
309 false
310 }
311}
312
313pub(crate) fn default_host_key() -> OptionalValuePath {
314 log_schema().host_key().cloned().into()
315}
316
317#[cfg(test)]
318mod test {
319 use approx::assert_relative_eq;
320 use std::{
321 collections::HashMap,
322 net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
323 sync::{
324 atomic::{AtomicBool, Ordering},
325 Arc,
326 },
327 thread,
328 };
329
330 use bytes::{BufMut, Bytes, BytesMut};
331 use futures::{stream, StreamExt};
332 use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
333 use serde_json::json;
334 use tokio::io::AsyncReadExt;
335 use tokio::net::TcpStream;
336 use tokio::{
337 task::JoinHandle,
338 time::{timeout, Duration, Instant},
339 };
340 #[cfg(unix)]
341 use vector_lib::codecs::{
342 decoding::CharacterDelimitedDecoderOptions, CharacterDelimitedDecoderConfig,
343 };
344 use vector_lib::codecs::{GelfDeserializerConfig, NewlineDelimitedDecoderConfig};
345 use vector_lib::event::EventContainer;
346 use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
347 use vrl::value::ObjectMap;
348 use vrl::{btreemap, value};
349
350 #[cfg(unix)]
351 use {
352 super::{unix::UnixConfig, Mode},
353 crate::sources::util::unix::UNNAMED_SOCKET_HOST,
354 crate::test_util::wait_for,
355 futures::{SinkExt, Stream},
356 std::future::ready,
357 std::os::unix::fs::PermissionsExt,
358 std::path::PathBuf,
359 tokio::{
360 io::AsyncWriteExt,
361 net::{UnixDatagram, UnixStream},
362 task::yield_now,
363 },
364 tokio_util::codec::{FramedWrite, LinesCodec},
365 };
366
367 use super::{tcp::TcpConfig, udp::UdpConfig, SocketConfig};
368 use crate::{
369 config::{log_schema, ComponentKey, GlobalOptions, SourceConfig, SourceContext},
370 event::{Event, LogEvent},
371 shutdown::{ShutdownSignal, SourceShutdownCoordinator},
372 sinks::util::tcp::TcpSinkConfig,
373 sources::util::net::SocketListenAddr,
374 test_util::{
375 collect_n, collect_n_limited,
376 components::{
377 assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
378 SOCKET_PUSH_SOURCE_TAGS,
379 },
380 next_addr, next_addr_any, random_string, send_lines, send_lines_tls, wait_for_tcp,
381 },
382 tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
383 SourceSender,
384 };
385
386 fn get_gelf_payload(message: &str) -> String {
387 serde_json::to_string(&json!({
388 "version": "1.1",
389 "host": "example.org",
390 "short_message": message,
391 "timestamp": 1234567890.123,
392 "level": 6,
393 "_foo": "bar",
394 }))
395 .unwrap()
396 }
397
398 fn create_gelf_chunk(
399 message_id: u64,
400 sequence_number: u8,
401 total_chunks: u8,
402 payload: &[u8],
403 ) -> Bytes {
404 const GELF_MAGIC: [u8; 2] = [0x1e, 0x0f];
405 let mut chunk = BytesMut::new();
406 chunk.put_slice(&GELF_MAGIC);
407 chunk.put_u64(message_id);
408 chunk.put_u8(sequence_number);
409 chunk.put_u8(total_chunks);
410 chunk.put(payload);
411 chunk.freeze()
412 }
413
414 fn get_gelf_chunks(short_message: &str, max_size: usize, rng: &mut SmallRng) -> Vec<Bytes> {
415 let message_id = rand::random();
416 let payload = get_gelf_payload(short_message);
417 let payload_chunks = payload.as_bytes().chunks(max_size).collect::<Vec<_>>();
418 let total_chunks = payload_chunks.len();
419 assert!(total_chunks <= 128, "too many gelf chunks");
420
421 let mut chunks = payload_chunks
422 .into_iter()
423 .enumerate()
424 .map(|(i, payload_chunk)| {
425 create_gelf_chunk(message_id, i as u8, total_chunks as u8, payload_chunk)
426 })
427 .collect::<Vec<_>>();
428 chunks.shuffle(rng);
430 chunks
431 }
432
433 #[test]
434 fn generate_config() {
435 crate::test_util::test_generate_config::<SocketConfig>();
436 }
437
438 #[tokio::test]
440 async fn tcp_it_includes_host() {
441 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
442 let (tx, mut rx) = SourceSender::new_test();
443 let addr = next_addr();
444
445 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
446 .build(SourceContext::new_test(tx, None))
447 .await
448 .unwrap();
449 tokio::spawn(server);
450
451 wait_for_tcp(addr).await;
452 let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
453 .await
454 .unwrap();
455
456 let event = rx.next().await.unwrap();
457
458 assert_eq!(event.as_log()["host"], addr.ip().to_string().into());
459 assert_eq!(event.as_log()["port"], addr.port().into());
460 })
461 .await;
462 }
463
464 #[tokio::test]
465 async fn tcp_it_includes_vector_namespaced_fields() {
466 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
467 let (tx, mut rx) = SourceSender::new_test();
468 let addr = next_addr();
469 let mut conf = TcpConfig::from_address(addr.into());
470 conf.set_log_namespace(Some(true));
471
472 let server = SocketConfig::from(conf)
473 .build(SourceContext::new_test(tx, None))
474 .await
475 .unwrap();
476 tokio::spawn(server);
477
478 wait_for_tcp(addr).await;
479 let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
480 .await
481 .unwrap();
482
483 let event = rx.next().await.unwrap();
484 let log = event.as_log();
485 let event_meta = log.metadata().value();
486
487 assert_eq!(log.value(), &"test".into());
488 assert_eq!(
489 event_meta.get(path!("vector", "source_type")).unwrap(),
490 &value!(SocketConfig::NAME)
491 );
492 assert_eq!(
493 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
494 &value!(addr.ip().to_string())
495 );
496 assert_eq!(
497 event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
498 &value!(addr.port())
499 );
500 })
501 .await;
502 }
503
504 #[tokio::test]
505 async fn tcp_splits_on_newline() {
506 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
507 let (tx, rx) = SourceSender::new_test();
508 let addr = next_addr();
509
510 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
511 .build(SourceContext::new_test(tx, None))
512 .await
513 .unwrap();
514 tokio::spawn(server);
515
516 wait_for_tcp(addr).await;
517 send_lines(addr, vec!["foo\nbar".to_owned()].into_iter())
518 .await
519 .unwrap();
520
521 let events = collect_n(rx, 2).await;
522
523 assert_eq!(events.len(), 2);
524 assert_eq!(
525 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
526 "foo".into()
527 );
528 assert_eq!(
529 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
530 "bar".into()
531 );
532 })
533 .await;
534 }
535
536 #[tokio::test]
537 async fn tcp_it_includes_source_type() {
538 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
539 let (tx, mut rx) = SourceSender::new_test();
540 let addr = next_addr();
541
542 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
543 .build(SourceContext::new_test(tx, None))
544 .await
545 .unwrap();
546 tokio::spawn(server);
547
548 wait_for_tcp(addr).await;
549 send_lines(addr, vec!["test".to_owned()].into_iter())
550 .await
551 .unwrap();
552
553 let event = rx.next().await.unwrap();
554 assert_eq!(
555 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
556 "socket".into()
557 );
558 })
559 .await;
560 }
561
562 #[tokio::test]
563 async fn tcp_continue_after_long_line() {
564 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
565 let (tx, mut rx) = SourceSender::new_test();
566 let addr = next_addr();
567
568 let mut config = TcpConfig::from_address(addr.into());
569 config.set_framing(Some(
570 NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
571 ));
572
573 let server = SocketConfig::from(config)
574 .build(SourceContext::new_test(tx, None))
575 .await
576 .unwrap();
577 tokio::spawn(server);
578
579 let lines = vec![
580 "short".to_owned(),
581 "this is too long".to_owned(),
582 "more short".to_owned(),
583 ];
584
585 wait_for_tcp(addr).await;
586 send_lines(addr, lines.into_iter()).await.unwrap();
587
588 let event = rx.next().await.unwrap();
589 assert_eq!(
590 event.as_log()[log_schema().message_key().unwrap().to_string()],
591 "short".into()
592 );
593
594 let event = rx.next().await.unwrap();
595 assert_eq!(
596 event.as_log()[log_schema().message_key().unwrap().to_string()],
597 "more short".into()
598 );
599 })
600 .await;
601 }
602
603 #[tokio::test]
604 async fn tcp_with_tls() {
605 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
606 let (tx, mut rx) = SourceSender::new_test();
607 let addr = next_addr();
608
609 let mut config = TcpConfig::from_address(addr.into());
610 config.set_tls(Some(TlsSourceConfig {
611 tls_config: TlsEnableableConfig {
612 enabled: Some(true),
613 options: TlsConfig {
614 verify_certificate: Some(true),
615 crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
616 key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
617 ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
618 ..Default::default()
619 },
620 },
621 client_metadata_key: Some(OptionalValuePath::from(owned_value_path!("tls_peer"))),
622 }));
623
624 let server = SocketConfig::from(config)
625 .build(SourceContext::new_test(tx, None))
626 .await
627 .unwrap();
628 tokio::spawn(server);
629
630 let lines = vec!["one line".to_owned(), "another line".to_owned()];
631
632 wait_for_tcp(addr).await;
633 send_lines_tls(
634 addr,
635 "localhost".into(),
636 lines.into_iter(),
637 std::path::Path::new(tls::TEST_PEM_CA_PATH),
638 std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
639 std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
640 )
641 .await
642 .unwrap();
643
644 let event = rx.next().await.unwrap();
645 assert_eq!(
646 event.as_log()[log_schema().message_key().unwrap().to_string()],
647 "one line".into()
648 );
649
650 let tls_meta: ObjectMap = btreemap!(
651 "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
652 );
653
654 assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
655
656 let event = rx.next().await.unwrap();
657 assert_eq!(
658 event.as_log()[log_schema().message_key().unwrap().to_string()],
659 "another line".into()
660 );
661
662 assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
663 })
664 .await;
665 }
666
667 #[tokio::test]
668 async fn tcp_with_tls_vector_namespace() {
669 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
670 let (tx, mut rx) = SourceSender::new_test();
671 let addr = next_addr();
672
673 let mut config = TcpConfig::from_address(addr.into());
674 config.set_tls(Some(TlsSourceConfig {
675 tls_config: TlsEnableableConfig {
676 enabled: Some(true),
677 options: TlsConfig {
678 verify_certificate: Some(true),
679 crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
680 key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
681 ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
682 ..Default::default()
683 },
684 },
685 client_metadata_key: None,
686 }));
687 config.log_namespace = Some(true);
688
689 let server = SocketConfig::from(config)
690 .build(SourceContext::new_test(tx, None))
691 .await
692 .unwrap();
693 tokio::spawn(server);
694
695 let lines = vec!["one line".to_owned(), "another line".to_owned()];
696
697 wait_for_tcp(addr).await;
698 send_lines_tls(
699 addr,
700 "localhost".into(),
701 lines.into_iter(),
702 std::path::Path::new(tls::TEST_PEM_CA_PATH),
703 std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
704 std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
705 )
706 .await
707 .unwrap();
708
709 let event = rx.next().await.unwrap();
710 let log = event.as_log();
711 let event_meta = log.metadata().value();
712
713 assert_eq!(log.value(), &"one line".into());
714
715 let tls_meta: ObjectMap = btreemap!(
716 "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
717 );
718
719 assert_eq!(
720 event_meta
721 .get(path!(SocketConfig::NAME, "tls_client_metadata"))
722 .unwrap(),
723 &value!(tls_meta.clone())
724 );
725
726 let event = rx.next().await.unwrap();
727 let log = event.as_log();
728 let event_meta = log.metadata().value();
729
730 assert_eq!(log.value(), &"another line".into());
731
732 assert_eq!(
733 event_meta
734 .get(path!(SocketConfig::NAME, "tls_client_metadata"))
735 .unwrap(),
736 &value!(tls_meta.clone())
737 );
738 })
739 .await;
740 }
741
742 #[tokio::test]
743 async fn tcp_shutdown_simple() {
744 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
745 let source_id = ComponentKey::from("tcp_shutdown_simple");
746 let (tx, mut rx) = SourceSender::new_test();
747 let addr = next_addr();
748 let (cx, mut shutdown) = SourceContext::new_shutdown(&source_id, tx);
749
750 let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
752 .build(cx)
753 .await
754 .unwrap();
755 let source_handle = tokio::spawn(server);
756
757 wait_for_tcp(addr).await;
759 send_lines(addr, vec!["test".to_owned()].into_iter())
760 .await
761 .unwrap();
762
763 let event = rx.next().await.unwrap();
764 assert_eq!(
765 event.as_log()[log_schema().message_key().unwrap().to_string()],
766 "test".into()
767 );
768
769 let deadline = Instant::now() + Duration::from_secs(10);
771 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
772 let shutdown_success = shutdown_complete.await;
773 assert!(shutdown_success);
774
775 _ = source_handle.await.unwrap();
777 })
778 .await;
779 }
780
781 #[tokio::test]
784 async fn tcp_shutdown_infinite_stream() {
785 let addr = next_addr();
790
791 let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000);
792 let source_key = ComponentKey::from("tcp_shutdown_infinite_stream");
793 let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx);
794
795 let mut source_config = TcpConfig::from_address(addr.into());
796 source_config.set_shutdown_timeout_secs(1);
797 let source_task = SocketConfig::from(source_config)
798 .build(source_cx)
799 .await
800 .unwrap();
801
802 let source_handle = tokio::spawn(source_task);
804 wait_for_tcp(addr).await;
805
806 let message = random_string(512);
810 let message_bytes = Bytes::from(message.clone());
811
812 #[derive(Clone, Debug)]
813 struct Serializer {
814 bytes: Bytes,
815 }
816 impl tokio_util::codec::Encoder<Event> for Serializer {
817 type Error = vector_lib::codecs::encoding::Error;
818
819 fn encode(&mut self, _: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
820 buffer.put(self.bytes.as_ref());
821 buffer.put_u8(b'\n');
822 Ok(())
823 }
824 }
825 let sink_config = TcpSinkConfig::from_address(format!("localhost:{}", addr.port()));
826 let encoder = Serializer {
827 bytes: message_bytes,
828 };
829 let (sink, _healthcheck) = sink_config.build(Default::default(), encoder).unwrap();
830
831 tokio::spawn(async move {
832 let input = stream::repeat_with(|| LogEvent::default().into()).boxed();
833 sink.run(input).await.unwrap();
834 });
835
836 let events = collect_n_limited(source_rx, 100)
840 .await
841 .into_iter()
842 .collect::<Vec<_>>();
843 assert_eq!(100, events.len());
844
845 let message_key = log_schema().message_key().unwrap().to_string();
846 let expected_message = message.clone().into();
847 for event in events.into_iter().flat_map(EventContainer::into_events) {
848 assert_eq!(event.as_log()[message_key.as_str()], expected_message);
849 }
850
851 let shutdown_timeout_limit = Duration::from_secs(10);
854 let deadline = Instant::now() + shutdown_timeout_limit;
855 let shutdown_complete = shutdown.shutdown_source(&source_key, deadline);
856
857 let shutdown_result = timeout(shutdown_timeout_limit, shutdown_complete).await;
858 assert_eq!(shutdown_result, Ok(true));
859
860 let source_result = source_handle.await.expect("source task should not panic");
861 assert_eq!(source_result, Ok(()));
862 }
863
864 #[tokio::test]
865 async fn tcp_connection_close_after_max_duration() {
866 let (tx, _) = SourceSender::new_test();
867 let addr = next_addr();
868
869 let mut source_config = TcpConfig::from_address(addr.into());
870 source_config.set_max_connection_duration_secs(Some(1));
871 let source_task = SocketConfig::from(source_config)
872 .build(SourceContext::new_test(tx, None))
873 .await
874 .unwrap();
875
876 drop(tokio::spawn(source_task));
878 wait_for_tcp(addr).await;
879
880 let mut stream: TcpStream = TcpStream::connect(addr)
881 .await
882 .expect("stream should be able to connect");
883 let start = Instant::now();
884
885 let timeout = tokio::time::sleep(Duration::from_millis(1200));
886 let mut buffer = [0u8; 10];
887
888 tokio::select! {
889 _ = timeout => {
890 panic!("timed out waiting for stream to close")
891 },
892 read_result = stream.read(&mut buffer) => {
893 match read_result {
894 Ok(0) => assert_relative_eq!(start.elapsed().as_secs_f64(), 1.0, epsilon = 0.3),
896 Ok(_) => panic!("unexpectedly read data from stream"),
897 Err(e) => panic!("{e:}")
898 }
899 }
900 }
901 }
902
903 fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
905 send_lines_udp_from(next_addr(), to, lines)
906 }
907
908 fn send_lines_udp_from(
909 from: SocketAddr,
910 to: SocketAddr,
911 lines: impl IntoIterator<Item = String>,
912 ) -> SocketAddr {
913 send_packets_udp_from(from, to, lines.into_iter().map(|line| line.into()))
914 }
915
916 fn send_packets_udp(to: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
917 send_packets_udp_from(next_addr(), to, packets)
918 }
919
920 fn send_packets_udp_from(
921 from: SocketAddr,
922 to: SocketAddr,
923 packets: impl IntoIterator<Item = Bytes>,
924 ) -> SocketAddr {
925 let socket = UdpSocket::bind(from)
926 .map_err(|error| panic!("{error:}"))
927 .ok()
928 .unwrap();
929
930 for packet in packets {
931 assert_eq!(
932 socket
933 .send_to(&packet, to)
934 .map_err(|error| panic!("{error:}"))
935 .ok()
936 .unwrap(),
937 packet.len()
938 );
939 thread::sleep(Duration::from_millis(1));
941 }
942
943 thread::sleep(Duration::from_millis(10));
945
946 from
948 }
949
950 async fn init_udp_with_shutdown(
951 sender: SourceSender,
952 source_id: &ComponentKey,
953 shutdown: &mut SourceShutdownCoordinator,
954 ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
955 let (shutdown_signal, _) = shutdown.register_source(source_id, false);
956 init_udp_inner(sender, source_id, shutdown_signal, None, false).await
957 }
958
959 async fn init_udp(sender: SourceSender, use_log_namespace: bool) -> SocketAddr {
960 let (addr, _handle) = init_udp_inner(
961 sender,
962 &ComponentKey::from("default"),
963 ShutdownSignal::noop(),
964 None,
965 use_log_namespace,
966 )
967 .await;
968 addr
969 }
970
971 async fn init_udp_with_config(sender: SourceSender, config: UdpConfig) -> SocketAddr {
972 let (addr, _handle) = init_udp_inner(
973 sender,
974 &ComponentKey::from("default"),
975 ShutdownSignal::noop(),
976 Some(config),
977 false,
978 )
979 .await;
980 addr
981 }
982
983 async fn init_udp_inner(
984 sender: SourceSender,
985 source_key: &ComponentKey,
986 shutdown_signal: ShutdownSignal,
987 config: Option<UdpConfig>,
988 use_vector_namespace: bool,
989 ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
990 let (address, mut config) = match config {
991 Some(config) => match config.address() {
992 SocketListenAddr::SocketAddr(addr) => (addr, config),
993 _ => panic!("listen address should not be systemd FD offset in tests"),
994 },
995 None => {
996 let address = next_addr();
997 (address, UdpConfig::from_address(address.into()))
998 }
999 };
1000
1001 let config = if use_vector_namespace {
1002 config.set_log_namespace(Some(true));
1003 config
1004 } else {
1005 config
1006 };
1007
1008 let server = SocketConfig::from(config)
1009 .build(SourceContext {
1010 key: source_key.clone(),
1011 globals: GlobalOptions::default(),
1012 enrichment_tables: Default::default(),
1013 shutdown: shutdown_signal,
1014 out: sender,
1015 proxy: Default::default(),
1016 acknowledgements: false,
1017 schema: Default::default(),
1018 schema_definitions: HashMap::default(),
1019 extra_context: Default::default(),
1020 })
1021 .await
1022 .unwrap();
1023 let source_handle = tokio::spawn(server);
1024
1025 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1027
1028 (address, source_handle)
1029 }
1030
1031 #[tokio::test]
1032 async fn udp_message() {
1033 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1034 let (tx, rx) = SourceSender::new_test();
1035 let address = init_udp(tx, false).await;
1036
1037 send_lines_udp(address, vec!["test".to_string()]);
1038 let events = collect_n(rx, 1).await;
1039
1040 assert_eq!(
1041 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1042 "test".into()
1043 );
1044 })
1045 .await;
1046 }
1047
1048 #[tokio::test]
1049 async fn udp_message_preserves_newline() {
1050 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1051 let (tx, rx) = SourceSender::new_test();
1052 let address = init_udp(tx, false).await;
1053
1054 send_lines_udp(address, vec!["foo\nbar".to_string()]);
1055 let events = collect_n(rx, 1).await;
1056
1057 assert_eq!(
1058 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1059 "foo\nbar".into()
1060 );
1061 })
1062 .await;
1063 }
1064
1065 #[tokio::test]
1066 async fn udp_multiple_packets() {
1067 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1068 let (tx, rx) = SourceSender::new_test();
1069 let address = init_udp(tx, false).await;
1070
1071 send_lines_udp(address, vec!["test".to_string(), "test2".to_string()]);
1072 let events = collect_n(rx, 2).await;
1073
1074 assert_eq!(
1075 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1076 "test".into()
1077 );
1078 assert_eq!(
1079 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1080 "test2".into()
1081 );
1082 })
1083 .await;
1084 }
1085
1086 #[tokio::test]
1087 async fn udp_max_length() {
1088 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1089 let (tx, rx) = SourceSender::new_test();
1090 let address = next_addr();
1091 let mut config = UdpConfig::from_address(address.into());
1092 config.max_length = 11;
1093 let address = init_udp_with_config(tx, config).await;
1094
1095 send_lines_udp(
1096 address,
1097 vec![
1098 "short line".to_string(),
1099 "test with a long line".to_string(),
1100 "a short un".to_string(),
1101 ],
1102 );
1103
1104 let events = collect_n(rx, 2).await;
1105 assert_eq!(
1106 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1107 "short line".into()
1108 );
1109 assert_eq!(
1110 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1111 "a short un".into()
1112 );
1113 })
1114 .await;
1115 }
1116
1117 #[cfg(unix)]
1118 #[tokio::test]
1119 async fn udp_max_length_delimited() {
1124 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1125 let (tx, rx) = SourceSender::new_test();
1126 let address = next_addr();
1127 let mut config = UdpConfig::from_address(address.into());
1128 config.max_length = 10;
1129 config.framing = Some(
1130 CharacterDelimitedDecoderConfig {
1131 character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
1132 }
1133 .into(),
1134 );
1135 let address = init_udp_with_config(tx, config).await;
1136
1137 send_lines_udp(
1138 address,
1139 vec!["test with, long line".to_string(), "short one".to_string()],
1140 );
1141
1142 let events = collect_n(rx, 2).await;
1143 assert_eq!(
1144 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1145 "test with".into()
1146 );
1147 assert_eq!(
1148 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1149 "short one".into()
1150 );
1151 })
1152 .await;
1153 }
1154
1155 #[tokio::test]
1156 async fn udp_decodes_chunked_gelf_messages() {
1157 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1158 let (tx, rx) = SourceSender::new_test();
1159 let address = next_addr();
1160 let mut config = UdpConfig::from_address(address.into());
1161 config.decoding = GelfDeserializerConfig::default().into();
1162 let address = init_udp_with_config(tx, config).await;
1163 let seed = 42;
1164 let mut rng = SmallRng::seed_from_u64(seed);
1165 let max_size = 300;
1166 let big_message = "This is a very large message".repeat(500);
1167 let another_big_message = "This is another very large message".repeat(500);
1168 let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1169 let mut another_chunks =
1170 get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1171 chunks.append(&mut another_chunks);
1172 chunks.shuffle(&mut rng);
1173
1174 send_packets_udp(address, chunks);
1175
1176 let events = collect_n(rx, 2).await;
1177 assert_eq!(
1178 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1179 big_message.into()
1180 );
1181 assert_eq!(
1182 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1183 another_big_message.into()
1184 );
1185 })
1186 .await;
1187 }
1188
1189 #[tokio::test]
1190 async fn udp_it_includes_host() {
1191 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1192 let (tx, rx) = SourceSender::new_test();
1193 let address = init_udp(tx, false).await;
1194
1195 let from = send_lines_udp(address, vec!["test".to_string()]);
1196 let events = collect_n(rx, 1).await;
1197
1198 assert_eq!(events[0].as_log()["host"], from.ip().to_string().into());
1199 assert_eq!(events[0].as_log()["port"], from.port().into());
1200 })
1201 .await;
1202 }
1203
1204 #[tokio::test]
1205 async fn udp_it_includes_vector_namespaced_fields() {
1206 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1207 let (tx, rx) = SourceSender::new_test();
1208 let address = init_udp(tx, true).await;
1209
1210 let from = send_lines_udp(address, vec!["test".to_string()]);
1211 let events = collect_n(rx, 1).await;
1212 let log = events[0].as_log();
1213 let event_meta = log.metadata().value();
1214
1215 assert_eq!(log.value(), &"test".into());
1216 assert_eq!(
1217 event_meta.get(path!("vector", "source_type")).unwrap(),
1218 &value!(SocketConfig::NAME)
1219 );
1220 assert_eq!(
1221 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1222 &value!(from.ip().to_string())
1223 );
1224 assert_eq!(
1225 event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
1226 &value!(from.port())
1227 );
1228 })
1229 .await;
1230 }
1231
1232 #[tokio::test]
1233 async fn udp_it_includes_source_type() {
1234 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1235 let (tx, rx) = SourceSender::new_test();
1236 let address = init_udp(tx, false).await;
1237
1238 _ = send_lines_udp(address, vec!["test".to_string()]);
1239 let events = collect_n(rx, 1).await;
1240
1241 assert_eq!(
1242 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1243 "socket".into()
1244 );
1245 })
1246 .await;
1247 }
1248
1249 #[tokio::test]
1250 async fn udp_shutdown_simple() {
1251 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1252 let (tx, rx) = SourceSender::new_test();
1253 let source_id = ComponentKey::from("udp_shutdown_simple");
1254
1255 let mut shutdown = SourceShutdownCoordinator::default();
1256 let (address, source_handle) =
1257 init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1258
1259 send_lines_udp(address, vec!["test".to_string()]);
1260 let events = collect_n(rx, 1).await;
1261
1262 assert_eq!(
1263 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1264 "test".into()
1265 );
1266
1267 let deadline = Instant::now() + Duration::from_secs(10);
1269 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1270 let shutdown_success = shutdown_complete.await;
1271 assert!(shutdown_success);
1272
1273 _ = source_handle.await.unwrap();
1275 })
1276 .await;
1277 }
1278
1279 #[tokio::test]
1280 async fn udp_shutdown_infinite_stream() {
1281 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1282 let (tx, rx) = SourceSender::new_test();
1283 let source_id = ComponentKey::from("udp_shutdown_infinite_stream");
1284
1285 let mut shutdown = SourceShutdownCoordinator::default();
1286 let (address, source_handle) =
1287 init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1288
1289 let run_pump_atomic_sender = Arc::new(AtomicBool::new(true));
1291 let run_pump_atomic_receiver = Arc::clone(&run_pump_atomic_sender);
1292 let pump_handle = std::thread::spawn(move || {
1293 send_lines_udp(
1294 address,
1295 std::iter::repeat("test".to_string())
1296 .take_while(move |_| run_pump_atomic_receiver.load(Ordering::Relaxed)),
1297 );
1298 });
1299
1300 let events = collect_n(rx, 100).await;
1302 assert_eq!(100, events.len());
1303 for event in events {
1304 assert_eq!(
1305 event.as_log()[log_schema().message_key().unwrap().to_string()],
1306 "test".into()
1307 );
1308 }
1309
1310 let deadline = Instant::now() + Duration::from_secs(10);
1311 let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1312 let shutdown_success = shutdown_complete.await;
1313 assert!(shutdown_success);
1314
1315 _ = source_handle.await.unwrap();
1317
1318 run_pump_atomic_sender.store(false, Ordering::Relaxed);
1320 assert!(pump_handle.join().is_ok());
1321 })
1322 .await;
1323 }
1324
1325 #[tokio::test]
1326 async fn multicast_udp_message() {
1327 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1328 let (tx, mut rx) = SourceSender::new_test();
1329 let socket_address = next_addr_any();
1331 let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1332 let multicast_socket_address =
1333 SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1334 let mut config = UdpConfig::from_address(socket_address.into());
1335 config.multicast_groups = vec![multicast_ip_address];
1336 init_udp_with_config(tx, config).await;
1337
1338 let from = next_addr_any();
1342 send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
1343
1344 let event = rx.next().await.expect("must receive an event");
1345 assert_eq!(
1346 event.as_log()[log_schema().message_key().unwrap().to_string()],
1347 "test".into()
1348 );
1349 })
1350 .await;
1351 }
1352
1353 #[tokio::test]
1354 async fn multiple_multicast_addresses_udp_message() {
1355 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1356 let (tx, mut rx) = SourceSender::new_test();
1357 let socket_address = next_addr_any();
1358 let multicast_ip_addresses = (2..12)
1359 .map(|i| format!("224.0.0.{i}").parse().unwrap())
1360 .collect::<Vec<Ipv4Addr>>();
1361 let multicast_ip_socket_addresses = multicast_ip_addresses
1362 .iter()
1363 .map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
1364 .collect::<Vec<SocketAddr>>();
1365 let mut config = UdpConfig::from_address(socket_address.into());
1366 config.multicast_groups = multicast_ip_addresses;
1367 init_udp_with_config(tx, config).await;
1368
1369 let from = next_addr_any();
1370 for multicast_ip_socket_address in multicast_ip_socket_addresses {
1371 send_lines_udp_from(
1372 from,
1373 multicast_ip_socket_address,
1374 [multicast_ip_socket_address.to_string()],
1375 );
1376
1377 let event = rx.next().await.expect("must receive an event");
1378 assert_eq!(
1379 event.as_log()[log_schema().message_key().unwrap().to_string()],
1380 multicast_ip_socket_address.to_string().into()
1381 );
1382 }
1383 })
1384 .await;
1385 }
1386
1387 #[tokio::test]
1388 async fn multicast_and_unicast_udp_message() {
1389 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1390 let (tx, mut rx) = SourceSender::new_test();
1391 let socket_address = next_addr_any();
1392 let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1393 let multicast_socket_address =
1394 SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1395 let mut config = UdpConfig::from_address(socket_address.into());
1396 config.multicast_groups = vec![multicast_ip_address];
1397 init_udp_with_config(tx, config).await;
1398
1399 let from = next_addr_any();
1400 send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
1402 let event = rx.next().await.expect("must receive an event");
1403 assert_eq!(
1404 event.as_log()[log_schema().message_key().unwrap().to_string()],
1405 "test".into()
1406 );
1407
1408 let to = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket_address.port());
1411 send_lines_udp_from(from, to, ["test".to_string()]);
1413 let event = rx.next().await.expect("must receive an event");
1414 assert_eq!(
1415 event.as_log()[log_schema().message_key().unwrap().to_string()],
1416 "test".into()
1417 );
1418 })
1419 .await;
1420 }
1421
1422 #[tokio::test]
1423 async fn udp_invalid_multicast_group() {
1424 assert_source_error(&COMPONENT_ERROR_TAGS, async {
1425 let (tx, _rx) = SourceSender::new_test();
1426 let socket_address = next_addr_any();
1427 let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
1428 let mut config = UdpConfig::from_address(socket_address.into());
1429 config.multicast_groups = vec![invalid_multicast_ip_address];
1430 init_udp_with_config(tx, config).await;
1431 })
1432 .await;
1433 }
1434
1435 #[cfg(unix)]
1438 async fn init_unix(sender: SourceSender, stream: bool, use_vector_namespace: bool) -> PathBuf {
1439 init_unix_inner(sender, stream, use_vector_namespace, None).await
1440 }
1441
1442 #[cfg(unix)]
1443 async fn init_unix_with_config(
1444 sender: SourceSender,
1445 stream: bool,
1446 use_vector_namespace: bool,
1447 config: UnixConfig,
1448 ) -> PathBuf {
1449 init_unix_inner(sender, stream, use_vector_namespace, Some(config)).await
1450 }
1451
1452 #[cfg(unix)]
1453 async fn init_unix_inner(
1454 sender: SourceSender,
1455 stream: bool,
1456 use_vector_namespace: bool,
1457 config: Option<UnixConfig>,
1458 ) -> PathBuf {
1459 let mut config = config.unwrap_or_else(|| {
1460 UnixConfig::new(tempfile::tempdir().unwrap().keep().join("unix_test"))
1461 });
1462
1463 let in_path = config.path.clone();
1464
1465 if use_vector_namespace {
1466 config.log_namespace = Some(true);
1467 }
1468
1469 let mode = if stream {
1470 Mode::UnixStream(config)
1471 } else {
1472 Mode::UnixDatagram(config)
1473 };
1474
1475 let server = SocketConfig { mode }
1476 .build(SourceContext::new_test(sender, None))
1477 .await
1478 .unwrap();
1479 tokio::spawn(server);
1480
1481 while if stream {
1483 std::os::unix::net::UnixStream::connect(&in_path).is_err()
1484 } else {
1485 let socket = std::os::unix::net::UnixDatagram::unbound().unwrap();
1486 socket.connect(&in_path).is_err()
1487 } {
1488 yield_now().await;
1489 }
1490
1491 in_path
1492 }
1493
1494 #[cfg(unix)]
1495 async fn unix_send_lines(stream: bool, path: PathBuf, lines: &[&str]) {
1496 match stream {
1497 false => send_lines_unix_datagram(path, lines).await,
1498 true => send_lines_unix_stream(path, lines).await,
1499 }
1500 }
1501
1502 #[cfg(unix)]
1503 async fn unix_message(
1504 message: &str,
1505 stream: bool,
1506 use_vector_namespace: bool,
1507 ) -> (PathBuf, impl Stream<Item = Event> + use<>) {
1508 let (tx, rx) = SourceSender::new_test();
1509 let path = init_unix(tx, stream, use_vector_namespace).await;
1510 let path_clone = path.clone();
1511
1512 unix_send_lines(stream, path, &[message]).await;
1513
1514 (path_clone, rx)
1515 }
1516
1517 #[cfg(unix)]
1518 async fn unix_multiple_packets(stream: bool) {
1519 let (tx, rx) = SourceSender::new_test();
1520 let path = init_unix(tx, stream, false).await;
1521
1522 unix_send_lines(stream, path, &["test", "test2"]).await;
1523 let events = collect_n(rx, 2).await;
1524
1525 assert_eq!(2, events.len());
1526 assert_eq!(
1527 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1528 "test".into()
1529 );
1530 assert_eq!(
1531 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1532 "test2".into()
1533 );
1534 }
1535
1536 #[cfg(unix)]
1537 fn parses_unix_config(mode: &str) -> SocketConfig {
1538 toml::from_str::<SocketConfig>(&format!(
1539 r#"
1540 mode = "{mode}"
1541 path = "/does/not/exist"
1542 "#
1543 ))
1544 .unwrap()
1545 }
1546
1547 #[cfg(unix)]
1548 fn parses_unix_config_file_mode(mode: &str) -> SocketConfig {
1549 toml::from_str::<SocketConfig>(&format!(
1550 r#"
1551 mode = "{mode}"
1552 path = "/does/not/exist"
1553 socket_file_mode = 0o777
1554 "#
1555 ))
1556 .unwrap()
1557 }
1558
1559 #[cfg(unix)]
1561 async fn send_lines_unix_datagram(path: PathBuf, lines: &[&str]) {
1562 let packets = lines.iter().map(|line| Bytes::from(line.to_string()));
1563 send_packets_unix_datagram(path, packets).await;
1564 }
1565
1566 #[cfg(unix)]
1567 async fn send_packets_unix_datagram(path: PathBuf, packets: impl IntoIterator<Item = Bytes>) {
1568 let socket = UnixDatagram::unbound().unwrap();
1569 socket.connect(path).unwrap();
1570
1571 for packet in packets {
1572 socket.send(&packet).await.unwrap();
1573 }
1574 socket.shutdown(std::net::Shutdown::Both).unwrap();
1575 }
1576
1577 #[cfg(unix)]
1578 #[tokio::test]
1579 async fn unix_datagram_message() {
1580 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1581 let (_, rx) = unix_message("test", false, false).await;
1582 let events = collect_n(rx, 1).await;
1583
1584 assert_eq!(events.len(), 1);
1585 assert_eq!(
1586 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1587 "test".into()
1588 );
1589 assert_eq!(
1590 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1591 "socket".into()
1592 );
1593 assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into());
1594 })
1595 .await;
1596 }
1597
1598 #[ignore]
1599 #[cfg(unix)]
1600 #[tokio::test]
1601 async fn unix_datagram_socket_test() {
1602 use tempfile::tempdir;
1606 use tokio::net::UnixDatagram;
1607
1608 let tmp = tempdir().unwrap();
1609
1610 let tx_path = tmp.path().join("tx");
1611
1612 let tx_type = "bound";
1615
1616 let tx = if tx_type == "bound" {
1617 UnixDatagram::bind(&tx_path).unwrap()
1618 } else {
1619 UnixDatagram::unbound().unwrap()
1620 };
1621
1622 let rx_path = tmp.path().join("rx");
1628 let rx = UnixDatagram::bind(&rx_path).unwrap();
1629
1630 tx.connect(&rx_path).unwrap();
1632
1633 let bytes = b"hello world";
1635 tx.send(bytes).await.unwrap();
1636
1637 let mut buf = vec![0u8; 24];
1638 let (size, _) = rx.recv_from(&mut buf).await.unwrap();
1639
1640 let dgram = &buf[..size];
1641 assert_eq!(dgram, bytes);
1642 }
1643
1644 #[cfg(unix)]
1645 #[tokio::test]
1646 async fn unix_datagram_chunked_gelf_messages() {
1647 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1648 let (tx, rx) = SourceSender::new_test();
1649 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1650 let mut config = UnixConfig::new(in_path.clone());
1651 config.decoding = GelfDeserializerConfig::default().into();
1652 let path = init_unix_with_config(tx, false, false, config).await;
1653 let seed = 42;
1654 let mut rng = SmallRng::seed_from_u64(seed);
1655 let max_size = 20;
1656 let big_message = "This is a very large message".repeat(5);
1657 let another_big_message = "This is another very large message".repeat(5);
1658 let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1659 let mut another_chunks =
1660 get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1661 chunks.append(&mut another_chunks);
1662 chunks.shuffle(&mut rng);
1663
1664 send_packets_unix_datagram(path, chunks).await;
1665
1666 let events = collect_n(rx, 2).await;
1667 assert_eq!(
1668 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1669 big_message.into()
1670 );
1671 assert_eq!(
1672 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1673 another_big_message.into()
1674 );
1675 })
1676 .await;
1677 }
1678
1679 #[cfg(unix)]
1680 #[tokio::test]
1681 async fn unix_datagram_message_with_vector_namespace() {
1682 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1683 let (_, rx) = unix_message("test", false, true).await;
1684 let events = collect_n(rx, 1).await;
1685 let log = events[0].as_log();
1686 let event_meta = log.metadata().value();
1687
1688 assert_eq!(log.value(), &"test".into());
1689 assert_eq!(events.len(), 1);
1690
1691 assert_eq!(
1692 event_meta.get(path!("vector", "source_type")).unwrap(),
1693 &value!(SocketConfig::NAME)
1694 );
1695
1696 assert_eq!(
1697 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1698 &value!(UNNAMED_SOCKET_HOST)
1699 );
1700 })
1701 .await;
1702 }
1703
1704 #[cfg(unix)]
1705 #[tokio::test]
1706 async fn unix_datagram_message_preserves_newline() {
1707 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1708 let (_, rx) = unix_message("foo\nbar", false, false).await;
1709 let events = collect_n(rx, 1).await;
1710
1711 assert_eq!(events.len(), 1);
1712 assert_eq!(
1713 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1714 "foo\nbar".into()
1715 );
1716 assert_eq!(
1717 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1718 "socket".into()
1719 );
1720 })
1721 .await;
1722 }
1723
1724 #[cfg(unix)]
1725 #[tokio::test]
1726 async fn unix_datagram_multiple_packets() {
1727 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1728 unix_multiple_packets(false).await
1729 })
1730 .await;
1731 }
1732
1733 #[cfg(unix)]
1734 #[test]
1735 fn parses_unix_datagram_config() {
1736 let config = parses_unix_config("unix_datagram");
1737 assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1738 }
1739
1740 #[cfg(unix)]
1741 #[test]
1742 fn parses_unix_datagram_perms() {
1743 let config = parses_unix_config_file_mode("unix_datagram");
1744 assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1745 }
1746
1747 #[cfg(unix)]
1748 #[tokio::test]
1749 async fn unix_datagram_permissions() {
1750 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1751 let (tx, _) = SourceSender::new_test();
1752
1753 let mut config = UnixConfig::new(in_path.clone());
1754 config.socket_file_mode = Some(0o555);
1755 let mode = Mode::UnixDatagram(config);
1756 let server = SocketConfig { mode }
1757 .build(SourceContext::new_test(tx, None))
1758 .await
1759 .unwrap();
1760 tokio::spawn(server);
1761
1762 wait_for(|| {
1763 match std::fs::metadata(&in_path) {
1764 Ok(meta) => {
1765 match meta.permissions().mode() {
1766 0o140555 => ready(true),
1768 _ => ready(false),
1769 }
1770 }
1771 Err(_) => ready(false),
1772 }
1773 })
1774 .await;
1775 }
1776
1777 #[cfg(unix)]
1779 async fn send_lines_unix_stream(path: PathBuf, lines: &[&str]) {
1780 let socket = UnixStream::connect(path).await.unwrap();
1781 let mut sink = FramedWrite::new(socket, LinesCodec::new());
1782
1783 let lines = lines.iter().map(|s| Ok(s.to_string()));
1784 let lines = lines.collect::<Vec<_>>();
1785 sink.send_all(&mut stream::iter(lines)).await.unwrap();
1786
1787 let mut socket = sink.into_inner();
1788 socket.shutdown().await.unwrap();
1789 }
1790
1791 #[cfg(unix)]
1792 #[tokio::test]
1793 async fn unix_stream_message() {
1794 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1795 let (_, rx) = unix_message("test", true, false).await;
1796 let events = collect_n(rx, 1).await;
1797
1798 assert_eq!(1, events.len());
1799 assert_eq!(
1800 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1801 "test".into()
1802 );
1803 assert_eq!(
1804 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1805 "socket".into()
1806 );
1807 })
1808 .await;
1809 }
1810
1811 #[cfg(unix)]
1812 #[tokio::test]
1813 async fn unix_stream_message_with_vector_namespace() {
1814 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1815 let (_, rx) = unix_message("test", true, true).await;
1816 let events = collect_n(rx, 1).await;
1817 let log = events[0].as_log();
1818 let event_meta = log.metadata().value();
1819
1820 assert_eq!(log.value(), &"test".into());
1821 assert_eq!(1, events.len());
1822 assert_eq!(
1823 event_meta.get(path!("vector", "source_type")).unwrap(),
1824 &value!(SocketConfig::NAME)
1825 );
1826 assert_eq!(
1827 event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1828 &value!(UNNAMED_SOCKET_HOST)
1829 );
1830 })
1831 .await;
1832 }
1833
1834 #[cfg(unix)]
1835 #[tokio::test]
1836 async fn unix_stream_message_splits_on_newline() {
1837 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1838 let (_, rx) = unix_message("foo\nbar", true, false).await;
1839 let events = collect_n(rx, 2).await;
1840
1841 assert_eq!(events.len(), 2);
1842 assert_eq!(
1843 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1844 "foo".into()
1845 );
1846 assert_eq!(
1847 events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1848 "socket".into()
1849 );
1850 assert_eq!(
1851 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1852 "bar".into()
1853 );
1854 assert_eq!(
1855 events[1].as_log()[log_schema().source_type_key().unwrap().to_string()],
1856 "socket".into()
1857 );
1858 })
1859 .await;
1860 }
1861
1862 #[cfg(unix)]
1863 #[tokio::test]
1864 async fn unix_stream_multiple_packets() {
1865 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1866 unix_multiple_packets(true).await
1867 })
1868 .await;
1869 }
1870
1871 #[cfg(unix)]
1872 #[test]
1873 fn parses_new_unix_stream_config() {
1874 let config = parses_unix_config("unix_stream");
1875 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1876 }
1877
1878 #[cfg(unix)]
1879 #[test]
1880 fn parses_new_unix_datagram_perms() {
1881 let config = parses_unix_config_file_mode("unix_stream");
1882 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1883 }
1884
1885 #[cfg(unix)]
1886 #[test]
1887 fn parses_old_unix_stream_config() {
1888 let config = parses_unix_config("unix");
1889 assert!(matches!(config.mode, Mode::UnixStream { .. }));
1890 }
1891
1892 #[cfg(unix)]
1893 #[tokio::test]
1894 async fn unix_stream_permissions() {
1895 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1896 let (tx, _) = SourceSender::new_test();
1897
1898 let mut config = UnixConfig::new(in_path.clone());
1899 config.socket_file_mode = Some(0o421);
1900 let mode = Mode::UnixStream(config);
1901 let server = SocketConfig { mode }
1902 .build(SourceContext::new_test(tx, None))
1903 .await
1904 .unwrap();
1905 tokio::spawn(server);
1906
1907 wait_for(|| {
1908 match std::fs::metadata(&in_path) {
1909 Ok(meta) => {
1910 match meta.permissions().mode() {
1911 0o140421 => ready(true),
1913 _ => ready(false),
1914 }
1915 }
1916 Err(_) => ready(false),
1917 }
1918 })
1919 .await;
1920 }
1921}