vector/sources/socket/
mod.rs

1pub mod tcp;
2pub mod udp;
3#[cfg(unix)]
4mod unix;
5
6use vector_lib::{
7    codecs::decoding::DeserializerConfig,
8    config::{LegacyKey, LogNamespace, log_schema},
9    configurable::configurable_component,
10    lookup::{lookup_v2::OptionalValuePath, owned_value_path},
11};
12use vrl::value::{Kind, kind::Collection};
13
14use crate::{
15    codecs::DecodingConfig,
16    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
17    sources::util::net::TcpSource,
18    tls::MaybeTlsSettings,
19};
20
21/// Configuration for the `socket` source.
22#[configurable_component(source("socket", "Collect logs over a socket."))]
23#[derive(Clone, Debug)]
24pub struct SocketConfig {
25    #[serde(flatten)]
26    pub mode: Mode,
27}
28
29/// Listening mode for the `socket` source.
30#[configurable_component]
31#[derive(Clone, Debug)]
32#[serde(tag = "mode", rename_all = "snake_case")]
33#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
34#[allow(clippy::large_enum_variant)] // just used for configuration
35pub enum Mode {
36    /// Listen on TCP.
37    Tcp(tcp::TcpConfig),
38
39    /// Listen on UDP.
40    Udp(udp::UdpConfig),
41
42    /// Listen on a Unix domain socket (UDS), in datagram mode.
43    #[cfg(unix)]
44    UnixDatagram(unix::UnixConfig),
45
46    /// Listen on a Unix domain socket (UDS), in stream mode.
47    #[cfg(unix)]
48    #[serde(alias = "unix")]
49    UnixStream(unix::UnixConfig),
50}
51
52impl SocketConfig {
53    pub fn new_tcp(tcp_config: tcp::TcpConfig) -> Self {
54        tcp_config.into()
55    }
56
57    pub fn make_basic_tcp_config(addr: std::net::SocketAddr) -> Self {
58        tcp::TcpConfig::from_address(addr.into()).into()
59    }
60
61    fn decoding(&self) -> DeserializerConfig {
62        match &self.mode {
63            Mode::Tcp(config) => config.decoding().clone(),
64            Mode::Udp(config) => config.decoding().clone(),
65            #[cfg(unix)]
66            Mode::UnixDatagram(config) => config.decoding().clone(),
67            #[cfg(unix)]
68            Mode::UnixStream(config) => config.decoding().clone(),
69        }
70    }
71
72    fn log_namespace(&self, global_log_namespace: LogNamespace) -> LogNamespace {
73        match &self.mode {
74            Mode::Tcp(config) => global_log_namespace.merge(config.log_namespace),
75            Mode::Udp(config) => global_log_namespace.merge(config.log_namespace),
76            #[cfg(unix)]
77            Mode::UnixDatagram(config) => global_log_namespace.merge(config.log_namespace),
78            #[cfg(unix)]
79            Mode::UnixStream(config) => global_log_namespace.merge(config.log_namespace),
80        }
81    }
82}
83
84impl From<tcp::TcpConfig> for SocketConfig {
85    fn from(config: tcp::TcpConfig) -> Self {
86        SocketConfig {
87            mode: Mode::Tcp(config),
88        }
89    }
90}
91
92impl From<udp::UdpConfig> for SocketConfig {
93    fn from(config: udp::UdpConfig) -> Self {
94        SocketConfig {
95            mode: Mode::Udp(config),
96        }
97    }
98}
99
100impl GenerateConfig for SocketConfig {
101    fn generate_config() -> toml::Value {
102        toml::from_str(
103            r#"mode = "tcp"
104            address = "0.0.0.0:9000""#,
105        )
106        .unwrap()
107    }
108}
109
110#[async_trait::async_trait]
111#[typetag::serde(name = "socket")]
112impl SourceConfig for SocketConfig {
113    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
114        match self.mode.clone() {
115            Mode::Tcp(config) => {
116                let log_namespace = cx.log_namespace(config.log_namespace);
117
118                let decoding = config.decoding().clone();
119                let decoder = DecodingConfig::new(
120                    config
121                        .framing
122                        .clone()
123                        .unwrap_or_else(|| decoding.default_stream_framing()),
124                    decoding,
125                    log_namespace,
126                )
127                .build()?;
128
129                let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
130                let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
131                let tls_client_metadata_key = config
132                    .tls()
133                    .as_ref()
134                    .and_then(|tls| tls.client_metadata_key.clone())
135                    .and_then(|k| k.path);
136                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
137                tcp.run(
138                    config.address(),
139                    config.keepalive(),
140                    config.shutdown_timeout_secs(),
141                    tls,
142                    tls_client_metadata_key,
143                    config.receive_buffer_bytes(),
144                    config.max_connection_duration_secs(),
145                    cx,
146                    false.into(),
147                    config.connection_limit,
148                    config.permit_origin.map(Into::into),
149                    SocketConfig::NAME,
150                    log_namespace,
151                )
152            }
153            Mode::Udp(config) => {
154                let log_namespace = cx.log_namespace(config.log_namespace);
155                let decoding = config.decoding().clone();
156                let framing = config
157                    .framing()
158                    .clone()
159                    .unwrap_or_else(|| decoding.default_message_based_framing());
160                let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
161                Ok(udp::udp(
162                    config,
163                    decoder,
164                    cx.shutdown,
165                    cx.out,
166                    log_namespace,
167                ))
168            }
169            #[cfg(unix)]
170            Mode::UnixDatagram(config) => {
171                let log_namespace = cx.log_namespace(config.log_namespace);
172                let decoding = config.decoding.clone();
173                let framing = config
174                    .framing
175                    .clone()
176                    .unwrap_or_else(|| decoding.default_message_based_framing());
177                let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
178
179                unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace)
180            }
181            #[cfg(unix)]
182            Mode::UnixStream(config) => {
183                let log_namespace = cx.log_namespace(config.log_namespace);
184
185                let decoding = config.decoding().clone();
186                let decoder = DecodingConfig::new(
187                    config
188                        .framing
189                        .clone()
190                        .unwrap_or_else(|| decoding.default_stream_framing()),
191                    decoding,
192                    log_namespace,
193                )
194                .build()?;
195
196                unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
197            }
198        }
199    }
200
201    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
202        let log_namespace = self.log_namespace(global_log_namespace);
203
204        let schema_definition = self
205            .decoding()
206            .schema_definition(log_namespace)
207            .with_standard_vector_source_metadata();
208
209        let schema_definition = match &self.mode {
210            Mode::Tcp(config) => {
211                let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
212
213                let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
214
215                let tls_client_metadata_path = config
216                    .tls()
217                    .as_ref()
218                    .and_then(|tls| tls.client_metadata_key.as_ref())
219                    .and_then(|k| k.path.clone())
220                    .map(LegacyKey::Overwrite);
221
222                schema_definition
223                    .with_source_metadata(
224                        Self::NAME,
225                        legacy_host_key,
226                        &owned_value_path!("host"),
227                        Kind::bytes(),
228                        Some("host"),
229                    )
230                    .with_source_metadata(
231                        Self::NAME,
232                        legacy_port_key,
233                        &owned_value_path!("port"),
234                        Kind::integer(),
235                        None,
236                    )
237                    .with_source_metadata(
238                        Self::NAME,
239                        tls_client_metadata_path,
240                        &owned_value_path!("tls_client_metadata"),
241                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
242                            .or_undefined(),
243                        None,
244                    )
245            }
246            Mode::Udp(config) => {
247                let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
248
249                let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
250
251                schema_definition
252                    .with_source_metadata(
253                        Self::NAME,
254                        legacy_host_key,
255                        &owned_value_path!("host"),
256                        Kind::bytes(),
257                        None,
258                    )
259                    .with_source_metadata(
260                        Self::NAME,
261                        legacy_port_key,
262                        &owned_value_path!("port"),
263                        Kind::integer(),
264                        None,
265                    )
266            }
267            #[cfg(unix)]
268            Mode::UnixDatagram(config) => {
269                let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
270
271                schema_definition.with_source_metadata(
272                    Self::NAME,
273                    legacy_host_key,
274                    &owned_value_path!("host"),
275                    Kind::bytes(),
276                    None,
277                )
278            }
279            #[cfg(unix)]
280            Mode::UnixStream(config) => {
281                let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
282
283                schema_definition.with_source_metadata(
284                    Self::NAME,
285                    legacy_host_key,
286                    &owned_value_path!("host"),
287                    Kind::bytes(),
288                    None,
289                )
290            }
291        };
292
293        vec![SourceOutput::new_maybe_logs(
294            self.decoding().output_type(),
295            schema_definition,
296        )]
297    }
298
299    fn resources(&self) -> Vec<Resource> {
300        match self.mode.clone() {
301            Mode::Tcp(tcp) => vec![tcp.address().as_tcp_resource()],
302            Mode::Udp(udp) => vec![udp.address().as_udp_resource()],
303            #[cfg(unix)]
304            Mode::UnixDatagram(_) => vec![],
305            #[cfg(unix)]
306            Mode::UnixStream(_) => vec![],
307        }
308    }
309
310    fn can_acknowledge(&self) -> bool {
311        false
312    }
313}
314
315pub(crate) fn default_host_key() -> OptionalValuePath {
316    log_schema().host_key().cloned().into()
317}
318
319#[cfg(test)]
320mod test {
321    use std::{
322        collections::HashMap,
323        net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
324        sync::{
325            Arc,
326            atomic::{AtomicBool, Ordering},
327        },
328        thread,
329    };
330
331    use approx::assert_relative_eq;
332    use bytes::{BufMut, Bytes, BytesMut};
333    use futures::{StreamExt, stream};
334    use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
335    use serde_json::json;
336    use tokio::{
337        io::AsyncReadExt,
338        net::TcpStream,
339        task::JoinHandle,
340        time::{Duration, Instant, timeout},
341    };
342    #[cfg(unix)]
343    use vector_lib::codecs::{
344        CharacterDelimitedDecoderConfig, decoding::CharacterDelimitedDecoderOptions,
345    };
346    use vector_lib::{
347        codecs::{GelfDeserializerConfig, NewlineDelimitedDecoderConfig},
348        event::EventContainer,
349        lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
350    };
351    use vrl::{btreemap, value, value::ObjectMap};
352    #[cfg(unix)]
353    use {
354        super::{Mode, unix::UnixConfig},
355        crate::sources::util::unix::UNNAMED_SOCKET_HOST,
356        crate::test_util::wait_for,
357        futures::{SinkExt, Stream},
358        std::future::ready,
359        std::os::unix::fs::PermissionsExt,
360        std::path::PathBuf,
361        tokio::{
362            io::AsyncWriteExt,
363            net::{UnixDatagram, UnixStream},
364            task::yield_now,
365        },
366        tokio_util::codec::{FramedWrite, LinesCodec},
367    };
368
369    use super::{SocketConfig, tcp::TcpConfig, udp::UdpConfig};
370    use crate::{
371        SourceSender,
372        config::{ComponentKey, GlobalOptions, SourceConfig, SourceContext, log_schema},
373        event::{Event, LogEvent},
374        shutdown::{ShutdownSignal, SourceShutdownCoordinator},
375        sinks::util::tcp::TcpSinkConfig,
376        sources::util::net::SocketListenAddr,
377        test_util::{
378            addr::{PortGuard, next_addr, next_addr_any},
379            collect_n, collect_n_limited,
380            components::{
381                COMPONENT_ERROR_TAGS, SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance,
382                assert_source_error,
383            },
384            random_string, send_lines, send_lines_tls, wait_for_tcp,
385        },
386        tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
387    };
388
389    async fn wait_for_tcp_and_release(guard: PortGuard, addr: SocketAddr) {
390        wait_for_tcp(addr).await;
391        drop(guard) // Now we're sure the socket was bound by the server and we can release the guard
392    }
393
394    pub fn bind_unused_udp() -> UdpSocket {
395        // Bind to port 0 to let the OS assign an available port
396        UdpSocket::bind((IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
397            .expect("Failed to bind UDP socket to OS-assigned port")
398    }
399
400    pub fn bind_unused_udp_any() -> UdpSocket {
401        // Bind to port 0 to let the OS assign an available port
402        UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0))
403            .expect("Failed to bind UDP socket to OS-assigned port")
404    }
405
406    fn get_gelf_payload(message: &str) -> String {
407        serde_json::to_string(&json!({
408            "version": "1.1",
409            "host": "example.org",
410            "short_message": message,
411            "timestamp": 1234567890.123,
412            "level": 6,
413            "_foo": "bar",
414        }))
415        .unwrap()
416    }
417
418    fn create_gelf_chunk(
419        message_id: u64,
420        sequence_number: u8,
421        total_chunks: u8,
422        payload: &[u8],
423    ) -> Bytes {
424        const GELF_MAGIC: [u8; 2] = [0x1e, 0x0f];
425        let mut chunk = BytesMut::new();
426        chunk.put_slice(&GELF_MAGIC);
427        chunk.put_u64(message_id);
428        chunk.put_u8(sequence_number);
429        chunk.put_u8(total_chunks);
430        chunk.put(payload);
431        chunk.freeze()
432    }
433
434    fn get_gelf_chunks(short_message: &str, max_size: usize, rng: &mut SmallRng) -> Vec<Bytes> {
435        let message_id = rand::random();
436        let payload = get_gelf_payload(short_message);
437        let payload_chunks = payload.as_bytes().chunks(max_size).collect::<Vec<_>>();
438        let total_chunks = payload_chunks.len();
439        assert!(total_chunks <= 128, "too many gelf chunks");
440
441        let mut chunks = payload_chunks
442            .into_iter()
443            .enumerate()
444            .map(|(i, payload_chunk)| {
445                create_gelf_chunk(message_id, i as u8, total_chunks as u8, payload_chunk)
446            })
447            .collect::<Vec<_>>();
448        // Shuffle the chunks to simulate out-of-order delivery
449        chunks.shuffle(rng);
450        chunks
451    }
452
453    #[test]
454    fn generate_config() {
455        crate::test_util::test_generate_config::<SocketConfig>();
456    }
457
458    //////// TCP TESTS ////////
459    #[tokio::test]
460    async fn tcp_it_includes_host() {
461        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
462            let (tx, mut rx) = SourceSender::new_test();
463            let (guard, addr) = next_addr();
464
465            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
466                .build(SourceContext::new_test(tx, None))
467                .await
468                .unwrap();
469            tokio::spawn(server);
470
471            wait_for_tcp_and_release(guard, addr).await;
472
473            let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
474                .await
475                .unwrap();
476
477            let event = rx.next().await.unwrap();
478
479            assert_eq!(event.as_log()["host"], addr.ip().to_string().into());
480            assert_eq!(event.as_log()["port"], addr.port().into());
481        })
482        .await;
483    }
484
485    #[tokio::test]
486    async fn tcp_it_includes_vector_namespaced_fields() {
487        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
488            let (tx, mut rx) = SourceSender::new_test();
489            let (guard, addr) = next_addr();
490            let mut conf = TcpConfig::from_address(addr.into());
491            conf.set_log_namespace(Some(true));
492
493            let server = SocketConfig::from(conf)
494                .build(SourceContext::new_test(tx, None))
495                .await
496                .unwrap();
497            tokio::spawn(server);
498
499            wait_for_tcp_and_release(guard, addr).await;
500
501            let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
502                .await
503                .unwrap();
504
505            let event = rx.next().await.unwrap();
506            let log = event.as_log();
507            let event_meta = log.metadata().value();
508
509            assert_eq!(log.value(), &"test".into());
510            assert_eq!(
511                event_meta.get(path!("vector", "source_type")).unwrap(),
512                &value!(SocketConfig::NAME)
513            );
514            assert_eq!(
515                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
516                &value!(addr.ip().to_string())
517            );
518            assert_eq!(
519                event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
520                &value!(addr.port())
521            );
522        })
523        .await;
524    }
525
526    #[tokio::test]
527    async fn tcp_splits_on_newline() {
528        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
529            let (tx, rx) = SourceSender::new_test();
530            let (guard, addr) = next_addr();
531
532            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
533                .build(SourceContext::new_test(tx, None))
534                .await
535                .unwrap();
536            tokio::spawn(server);
537
538            wait_for_tcp_and_release(guard, addr).await;
539
540            send_lines(addr, vec!["foo\nbar".to_owned()].into_iter())
541                .await
542                .unwrap();
543
544            let events = collect_n(rx, 2).await;
545
546            assert_eq!(events.len(), 2);
547            assert_eq!(
548                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
549                "foo".into()
550            );
551            assert_eq!(
552                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
553                "bar".into()
554            );
555        })
556        .await;
557    }
558
559    #[tokio::test]
560    async fn tcp_it_includes_source_type() {
561        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
562            let (tx, mut rx) = SourceSender::new_test();
563            let (guard, addr) = next_addr();
564
565            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
566                .build(SourceContext::new_test(tx, None))
567                .await
568                .unwrap();
569            tokio::spawn(server);
570
571            wait_for_tcp_and_release(guard, addr).await;
572            send_lines(addr, vec!["test".to_owned()].into_iter())
573                .await
574                .unwrap();
575
576            let event = rx.next().await.unwrap();
577            assert_eq!(
578                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
579                "socket".into()
580            );
581        })
582        .await;
583    }
584
585    #[tokio::test]
586    async fn tcp_continue_after_long_line() {
587        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
588            let (tx, mut rx) = SourceSender::new_test();
589            let (guard, addr) = next_addr();
590
591            let mut config = TcpConfig::from_address(addr.into());
592            config.set_framing(Some(
593                NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
594            ));
595
596            let server = SocketConfig::from(config)
597                .build(SourceContext::new_test(tx, None))
598                .await
599                .unwrap();
600            tokio::spawn(server);
601
602            let lines = vec![
603                "short".to_owned(),
604                "this is too long".to_owned(),
605                "more short".to_owned(),
606            ];
607
608            wait_for_tcp_and_release(guard, addr).await;
609            send_lines(addr, lines.into_iter()).await.unwrap();
610
611            let event = rx.next().await.unwrap();
612            assert_eq!(
613                event.as_log()[log_schema().message_key().unwrap().to_string()],
614                "short".into()
615            );
616
617            let event = rx.next().await.unwrap();
618            assert_eq!(
619                event.as_log()[log_schema().message_key().unwrap().to_string()],
620                "more short".into()
621            );
622        })
623        .await;
624    }
625
626    #[tokio::test]
627    async fn tcp_with_tls() {
628        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
629            let (tx, mut rx) = SourceSender::new_test();
630            let (guard, addr) = next_addr();
631
632            let mut config = TcpConfig::from_address(addr.into());
633            config.set_tls(Some(TlsSourceConfig {
634                tls_config: TlsEnableableConfig {
635                    enabled: Some(true),
636                    options: TlsConfig {
637                        verify_certificate: Some(true),
638                        crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
639                        key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
640                        ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
641                        ..Default::default()
642                    },
643                },
644                client_metadata_key: Some(OptionalValuePath::from(owned_value_path!("tls_peer"))),
645            }));
646
647            let server = SocketConfig::from(config)
648                .build(SourceContext::new_test(tx, None))
649                .await
650                .unwrap();
651            tokio::spawn(server);
652
653            let lines = vec!["one line".to_owned(), "another line".to_owned()];
654
655            wait_for_tcp_and_release(guard, addr).await;
656            send_lines_tls(
657                addr,
658                "localhost".into(),
659                lines.into_iter(),
660                std::path::Path::new(tls::TEST_PEM_CA_PATH),
661                std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
662                std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
663            )
664            .await
665            .unwrap();
666
667            let event = rx.next().await.unwrap();
668            assert_eq!(
669                event.as_log()[log_schema().message_key().unwrap().to_string()],
670                "one line".into()
671            );
672
673            let tls_meta: ObjectMap = btreemap!(
674                "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
675            );
676
677            assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
678
679            let event = rx.next().await.unwrap();
680            assert_eq!(
681                event.as_log()[log_schema().message_key().unwrap().to_string()],
682                "another line".into()
683            );
684
685            assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
686        })
687        .await;
688    }
689
690    #[tokio::test]
691    async fn tcp_with_tls_vector_namespace() {
692        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
693            let (tx, mut rx) = SourceSender::new_test();
694            let (guard, addr) = next_addr();
695
696            let mut config = TcpConfig::from_address(addr.into());
697            config.set_tls(Some(TlsSourceConfig {
698                tls_config: TlsEnableableConfig {
699                    enabled: Some(true),
700                    options: TlsConfig {
701                        verify_certificate: Some(true),
702                        crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
703                        key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
704                        ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
705                        ..Default::default()
706                    },
707                },
708                client_metadata_key: None,
709            }));
710            config.log_namespace = Some(true);
711
712            let server = SocketConfig::from(config)
713                .build(SourceContext::new_test(tx, None))
714                .await
715                .unwrap();
716            tokio::spawn(server);
717
718            let lines = vec!["one line".to_owned(), "another line".to_owned()];
719
720            wait_for_tcp_and_release(guard, addr).await;
721            send_lines_tls(
722                addr,
723                "localhost".into(),
724                lines.into_iter(),
725                std::path::Path::new(tls::TEST_PEM_CA_PATH),
726                std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
727                std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
728            )
729            .await
730            .unwrap();
731
732            let event = rx.next().await.unwrap();
733            let log = event.as_log();
734            let event_meta = log.metadata().value();
735
736            assert_eq!(log.value(), &"one line".into());
737
738            let tls_meta: ObjectMap = btreemap!(
739                "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
740            );
741
742            assert_eq!(
743                event_meta
744                    .get(path!(SocketConfig::NAME, "tls_client_metadata"))
745                    .unwrap(),
746                &value!(tls_meta.clone())
747            );
748
749            let event = rx.next().await.unwrap();
750            let log = event.as_log();
751            let event_meta = log.metadata().value();
752
753            assert_eq!(log.value(), &"another line".into());
754
755            assert_eq!(
756                event_meta
757                    .get(path!(SocketConfig::NAME, "tls_client_metadata"))
758                    .unwrap(),
759                &value!(tls_meta.clone())
760            );
761        })
762        .await;
763    }
764
765    #[tokio::test]
766    async fn tcp_shutdown_simple() {
767        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
768            let source_id = ComponentKey::from("tcp_shutdown_simple");
769            let (tx, mut rx) = SourceSender::new_test();
770            let (guard, addr) = next_addr();
771            let (cx, mut shutdown) = SourceContext::new_shutdown(&source_id, tx);
772
773            // Start TCP Source
774            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
775                .build(cx)
776                .await
777                .unwrap();
778            let source_handle = tokio::spawn(server);
779
780            // Send data to Source.
781            wait_for_tcp_and_release(guard, addr).await;
782            send_lines(addr, vec!["test".to_owned()].into_iter())
783                .await
784                .unwrap();
785
786            let event = rx.next().await.unwrap();
787            assert_eq!(
788                event.as_log()[log_schema().message_key().unwrap().to_string()],
789                "test".into()
790            );
791
792            // Now signal to the Source to shut down.
793            let deadline = Instant::now() + Duration::from_secs(10);
794            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
795            let shutdown_success = shutdown_complete.await;
796            assert!(shutdown_success);
797
798            // Ensure source actually shut down successfully.
799            _ = source_handle.await.unwrap();
800        })
801        .await;
802    }
803
804    // Intentially not using assert_source_compliance here because this is a round-trip test which
805    // means source and sink will both emit `EventsSent` , triggering multi-emission check.
806    #[tokio::test]
807    async fn tcp_shutdown_infinite_stream() {
808        // We create our TCP source with a larger-than-normal send buffer, which helps ensure that
809        // the source doesn't block on sending the events downstream, otherwise if it was blocked on
810        // doing so, it wouldn't be able to wake up and loop to see that it had been signalled to
811        // shutdown.
812        let (guard, addr) = next_addr();
813
814        let (source_tx, source_rx) = SourceSender::new_test_sender_with_options(10_000, None);
815        let source_key = ComponentKey::from("tcp_shutdown_infinite_stream");
816        let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx);
817
818        let mut source_config = TcpConfig::from_address(addr.into());
819        source_config.set_shutdown_timeout_secs(1);
820        let source_task = SocketConfig::from(source_config)
821            .build(source_cx)
822            .await
823            .unwrap();
824
825        // Spawn the source task and wait until we're sure it's listening:
826        let source_handle = tokio::spawn(source_task);
827        wait_for_tcp_and_release(guard, addr).await;
828
829        // Now we create a TCP _sink_ which we'll feed with an infinite stream of events to ship to
830        // our TCP source.  This will ensure that our TCP source is fully-loaded as we try to shut
831        // it down, exercising the logic we have to ensure timely shutdown even under load:
832        let message = random_string(512);
833        let message_bytes = Bytes::from(message.clone());
834
835        #[derive(Clone, Debug)]
836        struct Serializer {
837            bytes: Bytes,
838        }
839        impl tokio_util::codec::Encoder<Event> for Serializer {
840            type Error = vector_lib::codecs::encoding::Error;
841
842            fn encode(&mut self, _: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
843                buffer.put(self.bytes.as_ref());
844                buffer.put_u8(b'\n');
845                Ok(())
846            }
847        }
848        let sink_config = TcpSinkConfig::from_address(format!("localhost:{}", addr.port()));
849        let encoder = Serializer {
850            bytes: message_bytes,
851        };
852        let (sink, _healthcheck) = sink_config.build(Default::default(), encoder).unwrap();
853
854        tokio::spawn(async move {
855            let input = stream::repeat_with(|| LogEvent::default().into()).boxed();
856            sink.run(input).await.unwrap();
857        });
858
859        // Now with our sink running, feeding events to the source, collect 100 event arrays from
860        // the source and make sure each event within them matches the single message we repeatedly
861        // sent via the sink:
862        let events = collect_n_limited(source_rx, 100)
863            .await
864            .into_iter()
865            .collect::<Vec<_>>();
866        assert_eq!(100, events.len());
867
868        let message_key = log_schema().message_key().unwrap().to_string();
869        let expected_message = message.clone().into();
870        for event in events.into_iter().flat_map(EventContainer::into_events) {
871            assert_eq!(event.as_log()[message_key.as_str()], expected_message);
872        }
873
874        // Now trigger shutdown on the source and ensure that it shuts down before or at the
875        // deadline, and make sure the source task actually finished as well:
876        let shutdown_timeout_limit = Duration::from_secs(10);
877        let deadline = Instant::now() + shutdown_timeout_limit;
878        let shutdown_complete = shutdown.shutdown_source(&source_key, deadline);
879
880        let shutdown_result = timeout(shutdown_timeout_limit, shutdown_complete).await;
881        assert_eq!(shutdown_result, Ok(true));
882
883        let source_result = source_handle.await.expect("source task should not panic");
884        assert_eq!(source_result, Ok(()));
885    }
886
887    #[tokio::test]
888    async fn tcp_connection_close_after_max_duration() {
889        let (tx, _) = SourceSender::new_test();
890        let (guard, addr) = next_addr();
891
892        let mut source_config = TcpConfig::from_address(addr.into());
893        source_config.set_max_connection_duration_secs(Some(1));
894        let source_task = SocketConfig::from(source_config)
895            .build(SourceContext::new_test(tx, None))
896            .await
897            .unwrap();
898
899        // Spawn the source task and wait until we're sure it's listening:
900        drop(tokio::spawn(source_task));
901        wait_for_tcp_and_release(guard, addr).await;
902
903        let mut stream: TcpStream = TcpStream::connect(addr)
904            .await
905            .expect("stream should be able to connect");
906        let start = Instant::now();
907
908        let timeout = tokio::time::sleep(Duration::from_millis(1200));
909        let mut buffer = [0u8; 10];
910
911        tokio::select! {
912             _ = timeout => {
913                 panic!("timed out waiting for stream to close")
914             },
915             read_result = stream.read(&mut buffer) => {
916                 match read_result {
917                    // read resulting with 0 bytes -> the connection was closed
918                    Ok(0) => assert_relative_eq!(start.elapsed().as_secs_f64(), 1.0, epsilon = 0.3),
919                    Ok(_) => panic!("unexpectedly read data from stream"),
920                    Err(e) => panic!("{e:}")
921                 }
922             }
923        }
924    }
925
926    //////// UDP TESTS ////////
927    async fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> UdpSocket {
928        send_lines_udp_from(bind_unused_udp(), to, lines)
929    }
930
931    fn send_lines_udp_from(
932        from: UdpSocket,
933        to: SocketAddr,
934        lines: impl IntoIterator<Item = String>,
935    ) -> UdpSocket {
936        send_packets_udp_from(from, to, lines.into_iter().map(|line| line.into()))
937    }
938
939    async fn send_packets_udp(
940        to: SocketAddr,
941        packets: impl IntoIterator<Item = Bytes>,
942    ) -> UdpSocket {
943        send_packets_udp_from(bind_unused_udp(), to, packets)
944    }
945
946    fn send_packets_udp_from(
947        from: UdpSocket,
948        to: SocketAddr,
949        packets: impl IntoIterator<Item = Bytes>,
950    ) -> UdpSocket {
951        for packet in packets {
952            assert_eq!(
953                from.send_to(&packet, to)
954                    .map_err(|error| panic!("{error:}"))
955                    .ok()
956                    .unwrap(),
957                packet.len()
958            );
959            // Space things out slightly to try to avoid dropped packets
960            thread::sleep(Duration::from_millis(1));
961        }
962
963        // Give packets some time to flow through
964        thread::sleep(Duration::from_millis(10));
965
966        // Done
967        from
968    }
969
970    async fn init_udp_with_shutdown(
971        sender: SourceSender,
972        source_id: &ComponentKey,
973        shutdown: &mut SourceShutdownCoordinator,
974    ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
975        let (shutdown_signal, _) = shutdown.register_source(source_id, false);
976        init_udp_inner(sender, source_id, shutdown_signal, None, false).await
977    }
978
979    async fn init_udp(sender: SourceSender, use_log_namespace: bool) -> SocketAddr {
980        init_udp_inner(
981            sender,
982            &ComponentKey::from("default"),
983            ShutdownSignal::noop(),
984            None,
985            use_log_namespace,
986        )
987        .await
988        .0
989    }
990
991    async fn init_udp_with_config(sender: SourceSender, config: UdpConfig) -> SocketAddr {
992        init_udp_inner(
993            sender,
994            &ComponentKey::from("default"),
995            ShutdownSignal::noop(),
996            Some(config),
997            false,
998        )
999        .await
1000        .0
1001    }
1002
1003    async fn init_udp_inner(
1004        sender: SourceSender,
1005        source_key: &ComponentKey,
1006        shutdown_signal: ShutdownSignal,
1007        config: Option<UdpConfig>,
1008        use_vector_namespace: bool,
1009    ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
1010        let (guard, address, mut config) = match config {
1011            Some(config) => match config.address() {
1012                SocketListenAddr::SocketAddr(addr) => (None, addr, config),
1013                _ => panic!("listen address should not be systemd FD offset in tests"),
1014            },
1015            None => {
1016                let (guard, address) = next_addr();
1017                (
1018                    Some(guard),
1019                    address,
1020                    UdpConfig::from_address(address.into()),
1021                )
1022            }
1023        };
1024
1025        let config = if use_vector_namespace {
1026            config.set_log_namespace(Some(true));
1027            config
1028        } else {
1029            config
1030        };
1031
1032        let server = SocketConfig::from(config)
1033            .build(SourceContext {
1034                key: source_key.clone(),
1035                globals: GlobalOptions::default(),
1036                enrichment_tables: Default::default(),
1037                shutdown: shutdown_signal,
1038                out: sender,
1039                proxy: Default::default(),
1040                acknowledgements: false,
1041                schema: Default::default(),
1042                schema_definitions: HashMap::default(),
1043                extra_context: Default::default(),
1044                metrics_storage: Default::default(),
1045            })
1046            .await
1047            .unwrap();
1048        let source_handle = tokio::spawn(server);
1049
1050        // Wait for UDP to start listening
1051        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1052
1053        if let Some(guard) = guard {
1054            drop(guard)
1055        }
1056
1057        (address, source_handle)
1058    }
1059
1060    #[tokio::test]
1061    async fn udp_message() {
1062        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1063            let (tx, rx) = SourceSender::new_test();
1064            let address = init_udp(tx, false).await;
1065
1066            send_lines_udp(address, vec!["test".to_string()]).await;
1067            let events = collect_n(rx, 1).await;
1068
1069            assert_eq!(
1070                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1071                "test".into()
1072            );
1073        })
1074        .await;
1075    }
1076
1077    #[tokio::test]
1078    async fn udp_message_preserves_newline() {
1079        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1080            let (tx, rx) = SourceSender::new_test();
1081            let address = init_udp(tx, false).await;
1082
1083            send_lines_udp(address, vec!["foo\nbar".to_string()]).await;
1084            let events = collect_n(rx, 1).await;
1085
1086            assert_eq!(
1087                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1088                "foo\nbar".into()
1089            );
1090        })
1091        .await;
1092    }
1093
1094    #[tokio::test]
1095    async fn udp_multiple_packets() {
1096        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1097            let (tx, rx) = SourceSender::new_test();
1098            let address = init_udp(tx, false).await;
1099
1100            send_lines_udp(address, vec!["test".to_string(), "test2".to_string()]).await;
1101            let events = collect_n(rx, 2).await;
1102
1103            assert_eq!(
1104                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1105                "test".into()
1106            );
1107            assert_eq!(
1108                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1109                "test2".into()
1110            );
1111        })
1112        .await;
1113    }
1114
1115    #[tokio::test]
1116    async fn udp_max_length() {
1117        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1118            let (tx, rx) = SourceSender::new_test();
1119            let (_, address) = next_addr();
1120            let mut config = UdpConfig::from_address(address.into());
1121            config.max_length = 11;
1122            let address = init_udp_with_config(tx, config).await;
1123
1124            send_lines_udp(
1125                address,
1126                vec![
1127                    "short line".to_string(),
1128                    "test with a long line".to_string(),
1129                    "a short un".to_string(),
1130                ],
1131            )
1132            .await;
1133
1134            let events = collect_n(rx, 2).await;
1135            assert_eq!(
1136                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1137                "short line".into()
1138            );
1139            assert_eq!(
1140                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1141                "a short un".into()
1142            );
1143        })
1144        .await;
1145    }
1146
1147    #[cfg(unix)]
1148    #[tokio::test]
1149    /// This test only works on Unix.
1150    /// Unix truncates at max_length giving us the bytes to get the first n delimited messages.
1151    /// Windows will drop the entire packet if we exceed the max_length so we are unable to
1152    /// extract anything.
1153    async fn udp_max_length_delimited() {
1154        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1155            let (tx, rx) = SourceSender::new_test();
1156            let (_, address) = next_addr();
1157            let mut config = UdpConfig::from_address(address.into());
1158            config.max_length = 10;
1159            config.framing = Some(
1160                CharacterDelimitedDecoderConfig {
1161                    character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
1162                }
1163                .into(),
1164            );
1165            let address = init_udp_with_config(tx, config).await;
1166
1167            send_lines_udp(
1168                address,
1169                vec!["test with, long line".to_string(), "short one".to_string()],
1170            )
1171            .await;
1172
1173            let events = collect_n(rx, 2).await;
1174            assert_eq!(
1175                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1176                "test with".into()
1177            );
1178            assert_eq!(
1179                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1180                "short one".into()
1181            );
1182        })
1183        .await;
1184    }
1185
1186    #[tokio::test]
1187    async fn udp_decodes_chunked_gelf_messages() {
1188        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1189            let (tx, rx) = SourceSender::new_test();
1190            let (_, address) = next_addr();
1191            let mut config = UdpConfig::from_address(address.into());
1192            config.decoding = GelfDeserializerConfig::default().into();
1193            let address = init_udp_with_config(tx, config).await;
1194            let seed = 42;
1195            let mut rng = SmallRng::seed_from_u64(seed);
1196            let max_size = 300;
1197            let big_message = "This is a very large message".repeat(500);
1198            let another_big_message = "This is another very large message".repeat(500);
1199            let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1200            let mut another_chunks =
1201                get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1202            chunks.append(&mut another_chunks);
1203            chunks.shuffle(&mut rng);
1204
1205            send_packets_udp(address, chunks).await;
1206
1207            let events = collect_n(rx, 2).await;
1208            assert_eq!(
1209                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1210                big_message.into()
1211            );
1212            assert_eq!(
1213                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1214                another_big_message.into()
1215            );
1216        })
1217        .await;
1218    }
1219
1220    #[tokio::test]
1221    async fn udp_it_includes_host() {
1222        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1223            let (tx, rx) = SourceSender::new_test();
1224            let address = init_udp(tx, false).await;
1225
1226            let from = send_lines_udp(address, vec!["test".to_string()]).await;
1227            let events = collect_n(rx, 1).await;
1228
1229            assert_eq!(
1230                events[0].as_log()["host"],
1231                from.local_addr().unwrap().ip().to_string().into()
1232            );
1233            assert_eq!(
1234                events[0].as_log()["port"],
1235                from.local_addr().unwrap().port().into()
1236            );
1237        })
1238        .await;
1239    }
1240
1241    #[tokio::test]
1242    async fn udp_it_includes_vector_namespaced_fields() {
1243        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1244            let (tx, rx) = SourceSender::new_test();
1245            let address = init_udp(tx, true).await;
1246
1247            let from = send_lines_udp(address, vec!["test".to_string()]).await;
1248            let events = collect_n(rx, 1).await;
1249            let log = events[0].as_log();
1250            let event_meta = log.metadata().value();
1251
1252            assert_eq!(log.value(), &"test".into());
1253            assert_eq!(
1254                event_meta.get(path!("vector", "source_type")).unwrap(),
1255                &value!(SocketConfig::NAME)
1256            );
1257            assert_eq!(
1258                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1259                &value!(from.local_addr().unwrap().ip().to_string())
1260            );
1261            assert_eq!(
1262                event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
1263                &value!(from.local_addr().unwrap().port())
1264            );
1265        })
1266        .await;
1267    }
1268
1269    #[tokio::test]
1270    async fn udp_it_includes_source_type() {
1271        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1272            let (tx, rx) = SourceSender::new_test();
1273            let address = init_udp(tx, false).await;
1274
1275            _ = send_lines_udp(address, vec!["test".to_string()]).await;
1276            let events = collect_n(rx, 1).await;
1277
1278            assert_eq!(
1279                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1280                "socket".into()
1281            );
1282        })
1283        .await;
1284    }
1285
1286    #[tokio::test]
1287    async fn udp_shutdown_simple() {
1288        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1289            let (tx, rx) = SourceSender::new_test();
1290            let source_id = ComponentKey::from("udp_shutdown_simple");
1291
1292            let mut shutdown = SourceShutdownCoordinator::default();
1293            let (address, source_handle) =
1294                init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1295
1296            send_lines_udp(address, vec!["test".to_string()]).await;
1297            let events = collect_n(rx, 1).await;
1298
1299            assert_eq!(
1300                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1301                "test".into()
1302            );
1303
1304            // Now signal to the Source to shut down.
1305            let deadline = Instant::now() + Duration::from_secs(10);
1306            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1307            let shutdown_success = shutdown_complete.await;
1308            assert!(shutdown_success);
1309
1310            // Ensure source actually shut down successfully.
1311            _ = source_handle.await.unwrap();
1312        })
1313        .await;
1314    }
1315
1316    #[tokio::test]
1317    async fn udp_shutdown_infinite_stream() {
1318        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1319            let (tx, rx) = SourceSender::new_test();
1320            let source_id = ComponentKey::from("udp_shutdown_infinite_stream");
1321
1322            let mut shutdown = SourceShutdownCoordinator::default();
1323            let (address, source_handle) =
1324                init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1325
1326            // Stream that keeps sending lines to the UDP source forever.
1327            let run_pump_atomic_sender = Arc::new(AtomicBool::new(true));
1328            let run_pump_atomic_receiver = Arc::clone(&run_pump_atomic_sender);
1329            let pump_handle = tokio::task::spawn_blocking(move || {
1330                let handle = tokio::runtime::Handle::current();
1331                handle.block_on(send_lines_udp(
1332                    address,
1333                    std::iter::repeat("test".to_string())
1334                        .take_while(move |_| run_pump_atomic_receiver.load(Ordering::Relaxed)),
1335                ));
1336            });
1337
1338            // Important that 'rx' doesn't get dropped until the pump has finished sending items to it.
1339            let events = collect_n(rx, 100).await;
1340            assert_eq!(100, events.len());
1341            for event in events {
1342                assert_eq!(
1343                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1344                    "test".into()
1345                );
1346            }
1347
1348            let deadline = Instant::now() + Duration::from_secs(10);
1349            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1350            let shutdown_success = shutdown_complete.await;
1351            assert!(shutdown_success);
1352
1353            // Ensure that the source has actually shut down.
1354            _ = source_handle.await.unwrap();
1355
1356            // Stop the pump from sending lines forever.
1357            run_pump_atomic_sender.store(false, Ordering::Relaxed);
1358            assert!(pump_handle.await.is_ok());
1359        })
1360        .await;
1361    }
1362
1363    #[tokio::test]
1364    async fn multicast_udp_message() {
1365        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1366            let (tx, mut rx) = SourceSender::new_test();
1367            // The socket address must be `IPADDR_ANY` (0.0.0.0) in order to receive multicast packets
1368            let (_guard, socket_address) = next_addr_any();
1369            let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1370            let multicast_socket_address =
1371                SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1372            let mut config = UdpConfig::from_address(socket_address.into());
1373            config.multicast_groups = vec![multicast_ip_address];
1374            init_udp_with_config(tx, config).await;
1375
1376            // We must send packets to the same interface the `socket_address` is bound to
1377            // in order to receive the multicast packets the `from` socket sends.
1378            // To do so, we use the `IPADDR_ANY` address
1379            send_lines_udp_from(
1380                bind_unused_udp_any(),
1381                multicast_socket_address,
1382                ["test".to_string()],
1383            );
1384
1385            let event = rx.next().await.expect("must receive an event");
1386            assert_eq!(
1387                event.as_log()[log_schema().message_key().unwrap().to_string()],
1388                "test".into()
1389            );
1390        })
1391        .await;
1392    }
1393
1394    #[tokio::test]
1395    async fn multiple_multicast_addresses_udp_message() {
1396        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1397            let (tx, mut rx) = SourceSender::new_test();
1398            let (_guard, socket_address) = next_addr_any();
1399            let multicast_ip_addresses = (2..12)
1400                .map(|i| format!("224.0.0.{i}").parse().unwrap())
1401                .collect::<Vec<Ipv4Addr>>();
1402            let multicast_ip_socket_addresses = multicast_ip_addresses
1403                .iter()
1404                .map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
1405                .collect::<Vec<SocketAddr>>();
1406            let mut config = UdpConfig::from_address(socket_address.into());
1407            config.multicast_groups = multicast_ip_addresses;
1408            init_udp_with_config(tx, config).await;
1409
1410            let mut from = bind_unused_udp_any();
1411            for multicast_ip_socket_address in multicast_ip_socket_addresses {
1412                from = send_lines_udp_from(
1413                    from,
1414                    multicast_ip_socket_address,
1415                    [multicast_ip_socket_address.to_string()],
1416                );
1417
1418                let event = rx.next().await.expect("must receive an event");
1419                assert_eq!(
1420                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1421                    multicast_ip_socket_address.to_string().into()
1422                );
1423            }
1424        })
1425        .await;
1426    }
1427
1428    #[tokio::test]
1429    async fn multicast_and_unicast_udp_message() {
1430        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1431            let (tx, mut rx) = SourceSender::new_test();
1432            let (_guard, socket_address) = next_addr_any();
1433            let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1434            let multicast_socket_address =
1435                SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1436            let mut config = UdpConfig::from_address(socket_address.into());
1437            config.multicast_groups = vec![multicast_ip_address];
1438            init_udp_with_config(tx, config).await;
1439
1440            // Send packet to multicast address
1441            let _ = send_lines_udp_from(
1442                bind_unused_udp_any(),
1443                multicast_socket_address,
1444                ["test".to_string()],
1445            );
1446            let event = rx.next().await.expect("must receive an event");
1447            assert_eq!(
1448                event.as_log()[log_schema().message_key().unwrap().to_string()],
1449                "test".into()
1450            );
1451
1452            // Windows does not support connecting to `0.0.0.0`,
1453            // therefore we connect to `127.0.0.1` instead (the socket is listening at `0.0.0.0`)
1454            let to = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket_address.port());
1455            // Send packet to unicast address
1456            // Use a fresh socket - on macOS, a socket bound to 0.0.0.0 that sends to multicast
1457            // cannot subsequently send unicast packets that the listener receives
1458            send_lines_udp_from(bind_unused_udp(), to, ["test".to_string()]);
1459            let event = rx.next().await.expect("must receive an event");
1460            assert_eq!(
1461                event.as_log()[log_schema().message_key().unwrap().to_string()],
1462                "test".into()
1463            );
1464        })
1465        .await;
1466    }
1467
1468    #[tokio::test]
1469    async fn udp_invalid_multicast_group() {
1470        assert_source_error(&COMPONENT_ERROR_TAGS, async {
1471            let (tx, _rx) = SourceSender::new_test();
1472            let (_, socket_address) = next_addr_any();
1473            let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
1474            let mut config = UdpConfig::from_address(socket_address.into());
1475            config.multicast_groups = vec![invalid_multicast_ip_address];
1476            init_udp_with_config(tx, config).await;
1477        })
1478        .await;
1479    }
1480
1481    ////////////// UNIX TEST LIBS //////////////
1482
1483    #[cfg(unix)]
1484    async fn init_unix(sender: SourceSender, stream: bool, use_vector_namespace: bool) -> PathBuf {
1485        init_unix_inner(sender, stream, use_vector_namespace, None).await
1486    }
1487
1488    #[cfg(unix)]
1489    async fn init_unix_with_config(
1490        sender: SourceSender,
1491        stream: bool,
1492        use_vector_namespace: bool,
1493        config: UnixConfig,
1494    ) -> PathBuf {
1495        init_unix_inner(sender, stream, use_vector_namespace, Some(config)).await
1496    }
1497
1498    #[cfg(unix)]
1499    async fn init_unix_inner(
1500        sender: SourceSender,
1501        stream: bool,
1502        use_vector_namespace: bool,
1503        config: Option<UnixConfig>,
1504    ) -> PathBuf {
1505        let mut config = config.unwrap_or_else(|| {
1506            UnixConfig::new(tempfile::tempdir().unwrap().keep().join("unix_test"))
1507        });
1508
1509        let in_path = config.path.clone();
1510
1511        if use_vector_namespace {
1512            config.log_namespace = Some(true);
1513        }
1514
1515        let mode = if stream {
1516            Mode::UnixStream(config)
1517        } else {
1518            Mode::UnixDatagram(config)
1519        };
1520
1521        let server = SocketConfig { mode }
1522            .build(SourceContext::new_test(sender, None))
1523            .await
1524            .unwrap();
1525        tokio::spawn(server);
1526
1527        // Wait for server to accept traffic
1528        while if stream {
1529            std::os::unix::net::UnixStream::connect(&in_path).is_err()
1530        } else {
1531            let socket = std::os::unix::net::UnixDatagram::unbound().unwrap();
1532            socket.connect(&in_path).is_err()
1533        } {
1534            yield_now().await;
1535        }
1536
1537        in_path
1538    }
1539
1540    #[cfg(unix)]
1541    async fn unix_send_lines(stream: bool, path: PathBuf, lines: &[&str]) {
1542        match stream {
1543            false => send_lines_unix_datagram(path, lines).await,
1544            true => send_lines_unix_stream(path, lines).await,
1545        }
1546    }
1547
1548    #[cfg(unix)]
1549    async fn unix_message(
1550        message: &str,
1551        stream: bool,
1552        use_vector_namespace: bool,
1553    ) -> (PathBuf, impl Stream<Item = Event> + use<>) {
1554        let (tx, rx) = SourceSender::new_test();
1555        let path = init_unix(tx, stream, use_vector_namespace).await;
1556        let path_clone = path.clone();
1557
1558        unix_send_lines(stream, path, &[message]).await;
1559
1560        (path_clone, rx)
1561    }
1562
1563    #[cfg(unix)]
1564    async fn unix_multiple_packets(stream: bool) {
1565        let (tx, rx) = SourceSender::new_test();
1566        let path = init_unix(tx, stream, false).await;
1567
1568        unix_send_lines(stream, path, &["test", "test2"]).await;
1569        let events = collect_n(rx, 2).await;
1570
1571        assert_eq!(2, events.len());
1572        assert_eq!(
1573            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1574            "test".into()
1575        );
1576        assert_eq!(
1577            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1578            "test2".into()
1579        );
1580    }
1581
1582    #[cfg(unix)]
1583    fn parses_unix_config(mode: &str) -> SocketConfig {
1584        toml::from_str::<SocketConfig>(&format!(
1585            r#"
1586               mode = "{mode}"
1587               path = "/does/not/exist"
1588            "#
1589        ))
1590        .unwrap()
1591    }
1592
1593    #[cfg(unix)]
1594    fn parses_unix_config_file_mode(mode: &str) -> SocketConfig {
1595        toml::from_str::<SocketConfig>(&format!(
1596            r#"
1597               mode = "{mode}"
1598               path = "/does/not/exist"
1599               socket_file_mode = 0o777
1600            "#
1601        ))
1602        .unwrap()
1603    }
1604
1605    ////////////// UNIX DATAGRAM TESTS //////////////
1606    #[cfg(unix)]
1607    async fn send_lines_unix_datagram(path: PathBuf, lines: &[&str]) {
1608        let packets = lines.iter().map(|line| Bytes::from(line.to_string()));
1609        send_packets_unix_datagram(path, packets).await;
1610    }
1611
1612    #[cfg(unix)]
1613    async fn send_packets_unix_datagram(path: PathBuf, packets: impl IntoIterator<Item = Bytes>) {
1614        let socket = UnixDatagram::unbound().unwrap();
1615        socket.connect(path).unwrap();
1616
1617        for packet in packets {
1618            socket.send(&packet).await.unwrap();
1619        }
1620        socket.shutdown(std::net::Shutdown::Both).unwrap();
1621    }
1622
1623    #[cfg(unix)]
1624    #[tokio::test]
1625    async fn unix_datagram_message() {
1626        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1627            let (_, rx) = unix_message("test", false, false).await;
1628            let events = collect_n(rx, 1).await;
1629
1630            assert_eq!(events.len(), 1);
1631            assert_eq!(
1632                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1633                "test".into()
1634            );
1635            assert_eq!(
1636                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1637                "socket".into()
1638            );
1639            assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into());
1640        })
1641        .await;
1642    }
1643
1644    #[ignore]
1645    #[cfg(unix)]
1646    #[tokio::test]
1647    async fn unix_datagram_socket_test() {
1648        // This test is useful for testing the behavior of datagram
1649        // sockets.
1650
1651        use tempfile::tempdir;
1652        use tokio::net::UnixDatagram;
1653
1654        let tmp = tempdir().unwrap();
1655
1656        let tx_path = tmp.path().join("tx");
1657
1658        // Switch this var between "bound" and "unbound" to test
1659        // different types of socket behavior.
1660        let tx_type = "bound";
1661
1662        let tx = if tx_type == "bound" {
1663            UnixDatagram::bind(&tx_path).unwrap()
1664        } else {
1665            UnixDatagram::unbound().unwrap()
1666        };
1667
1668        // The following debug statements showcase some useful info:
1669        // dbg!(tx.local_addr().unwrap());
1670        // dbg!(std::os::unix::prelude::AsRawFd::as_raw_fd(&tx));
1671
1672        // Create another, bound socket
1673        let rx_path = tmp.path().join("rx");
1674        let rx = UnixDatagram::bind(&rx_path).unwrap();
1675
1676        // Connect to the bound socket
1677        tx.connect(&rx_path).unwrap();
1678
1679        // Send to the bound socket
1680        let bytes = b"hello world";
1681        tx.send(bytes).await.unwrap();
1682
1683        let mut buf = vec![0u8; 24];
1684        let (size, _) = rx.recv_from(&mut buf).await.unwrap();
1685
1686        let dgram = &buf[..size];
1687        assert_eq!(dgram, bytes);
1688    }
1689
1690    #[cfg(unix)]
1691    #[tokio::test]
1692    async fn unix_datagram_chunked_gelf_messages() {
1693        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1694            let (tx, rx) = SourceSender::new_test();
1695            let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1696            let mut config = UnixConfig::new(in_path.clone());
1697            config.decoding = GelfDeserializerConfig::default().into();
1698            let path = init_unix_with_config(tx, false, false, config).await;
1699            let seed = 42;
1700            let mut rng = SmallRng::seed_from_u64(seed);
1701            let max_size = 20;
1702            let big_message = "This is a very large message".repeat(5);
1703            let another_big_message = "This is another very large message".repeat(5);
1704            let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1705            let mut another_chunks =
1706                get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1707            chunks.append(&mut another_chunks);
1708            chunks.shuffle(&mut rng);
1709
1710            send_packets_unix_datagram(path, chunks).await;
1711
1712            let events = collect_n(rx, 2).await;
1713            assert_eq!(
1714                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1715                big_message.into()
1716            );
1717            assert_eq!(
1718                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1719                another_big_message.into()
1720            );
1721        })
1722        .await;
1723    }
1724
1725    #[cfg(unix)]
1726    #[tokio::test]
1727    async fn unix_datagram_message_with_vector_namespace() {
1728        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1729            let (_, rx) = unix_message("test", false, true).await;
1730            let events = collect_n(rx, 1).await;
1731            let log = events[0].as_log();
1732            let event_meta = log.metadata().value();
1733
1734            assert_eq!(log.value(), &"test".into());
1735            assert_eq!(events.len(), 1);
1736
1737            assert_eq!(
1738                event_meta.get(path!("vector", "source_type")).unwrap(),
1739                &value!(SocketConfig::NAME)
1740            );
1741
1742            assert_eq!(
1743                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1744                &value!(UNNAMED_SOCKET_HOST)
1745            );
1746        })
1747        .await;
1748    }
1749
1750    #[cfg(unix)]
1751    #[tokio::test]
1752    async fn unix_datagram_message_preserves_newline() {
1753        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1754            let (_, rx) = unix_message("foo\nbar", false, false).await;
1755            let events = collect_n(rx, 1).await;
1756
1757            assert_eq!(events.len(), 1);
1758            assert_eq!(
1759                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1760                "foo\nbar".into()
1761            );
1762            assert_eq!(
1763                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1764                "socket".into()
1765            );
1766        })
1767        .await;
1768    }
1769
1770    #[cfg(unix)]
1771    #[tokio::test]
1772    async fn unix_datagram_multiple_packets() {
1773        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1774            unix_multiple_packets(false).await
1775        })
1776        .await;
1777    }
1778
1779    #[cfg(unix)]
1780    #[test]
1781    fn parses_unix_datagram_config() {
1782        let config = parses_unix_config("unix_datagram");
1783        assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1784    }
1785
1786    #[cfg(unix)]
1787    #[test]
1788    fn parses_unix_datagram_perms() {
1789        let config = parses_unix_config_file_mode("unix_datagram");
1790        assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1791    }
1792
1793    #[cfg(unix)]
1794    #[tokio::test]
1795    async fn unix_datagram_permissions() {
1796        let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1797        let (tx, _) = SourceSender::new_test();
1798
1799        let mut config = UnixConfig::new(in_path.clone());
1800        config.socket_file_mode = Some(0o555);
1801        let mode = Mode::UnixDatagram(config);
1802        let server = SocketConfig { mode }
1803            .build(SourceContext::new_test(tx, None))
1804            .await
1805            .unwrap();
1806        tokio::spawn(server);
1807
1808        wait_for(|| {
1809            match std::fs::metadata(&in_path) {
1810                Ok(meta) => {
1811                    match meta.permissions().mode() {
1812                        // S_IFSOCK   0140000   socket
1813                        0o140555 => ready(true),
1814                        _ => ready(false),
1815                    }
1816                }
1817                Err(_) => ready(false),
1818            }
1819        })
1820        .await;
1821    }
1822
1823    ////////////// UNIX STREAM TESTS //////////////
1824    #[cfg(unix)]
1825    async fn send_lines_unix_stream(path: PathBuf, lines: &[&str]) {
1826        let socket = UnixStream::connect(path).await.unwrap();
1827        let mut sink = FramedWrite::new(socket, LinesCodec::new());
1828
1829        let lines = lines.iter().map(|s| Ok(s.to_string()));
1830        let lines = lines.collect::<Vec<_>>();
1831        sink.send_all(&mut stream::iter(lines)).await.unwrap();
1832
1833        let mut socket = sink.into_inner();
1834        socket.shutdown().await.unwrap();
1835    }
1836
1837    #[cfg(unix)]
1838    #[tokio::test]
1839    async fn unix_stream_message() {
1840        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1841            let (_, rx) = unix_message("test", true, false).await;
1842            let events = collect_n(rx, 1).await;
1843
1844            assert_eq!(1, events.len());
1845            assert_eq!(
1846                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1847                "test".into()
1848            );
1849            assert_eq!(
1850                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1851                "socket".into()
1852            );
1853        })
1854        .await;
1855    }
1856
1857    #[cfg(unix)]
1858    #[tokio::test]
1859    async fn unix_stream_message_with_vector_namespace() {
1860        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1861            let (_, rx) = unix_message("test", true, true).await;
1862            let events = collect_n(rx, 1).await;
1863            let log = events[0].as_log();
1864            let event_meta = log.metadata().value();
1865
1866            assert_eq!(log.value(), &"test".into());
1867            assert_eq!(1, events.len());
1868            assert_eq!(
1869                event_meta.get(path!("vector", "source_type")).unwrap(),
1870                &value!(SocketConfig::NAME)
1871            );
1872            assert_eq!(
1873                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1874                &value!(UNNAMED_SOCKET_HOST)
1875            );
1876        })
1877        .await;
1878    }
1879
1880    #[cfg(unix)]
1881    #[tokio::test]
1882    async fn unix_stream_message_splits_on_newline() {
1883        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1884            let (_, rx) = unix_message("foo\nbar", true, false).await;
1885            let events = collect_n(rx, 2).await;
1886
1887            assert_eq!(events.len(), 2);
1888            assert_eq!(
1889                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1890                "foo".into()
1891            );
1892            assert_eq!(
1893                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1894                "socket".into()
1895            );
1896            assert_eq!(
1897                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1898                "bar".into()
1899            );
1900            assert_eq!(
1901                events[1].as_log()[log_schema().source_type_key().unwrap().to_string()],
1902                "socket".into()
1903            );
1904        })
1905        .await;
1906    }
1907
1908    #[cfg(unix)]
1909    #[tokio::test]
1910    async fn unix_stream_multiple_packets() {
1911        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1912            unix_multiple_packets(true).await
1913        })
1914        .await;
1915    }
1916
1917    #[cfg(unix)]
1918    #[test]
1919    fn parses_new_unix_stream_config() {
1920        let config = parses_unix_config("unix_stream");
1921        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1922    }
1923
1924    #[cfg(unix)]
1925    #[test]
1926    fn parses_new_unix_datagram_perms() {
1927        let config = parses_unix_config_file_mode("unix_stream");
1928        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1929    }
1930
1931    #[cfg(unix)]
1932    #[test]
1933    fn parses_old_unix_stream_config() {
1934        let config = parses_unix_config("unix");
1935        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1936    }
1937
1938    #[cfg(unix)]
1939    #[tokio::test]
1940    async fn unix_stream_permissions() {
1941        let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1942        let (tx, _) = SourceSender::new_test();
1943
1944        let mut config = UnixConfig::new(in_path.clone());
1945        config.socket_file_mode = Some(0o421);
1946        let mode = Mode::UnixStream(config);
1947        let server = SocketConfig { mode }
1948            .build(SourceContext::new_test(tx, None))
1949            .await
1950            .unwrap();
1951        tokio::spawn(server);
1952
1953        wait_for(|| {
1954            match std::fs::metadata(&in_path) {
1955                Ok(meta) => {
1956                    match meta.permissions().mode() {
1957                        // S_IFSOCK   0140000   socket
1958                        0o140421 => ready(true),
1959                        _ => ready(false),
1960                    }
1961                }
1962                Err(_) => ready(false),
1963            }
1964        })
1965        .await;
1966    }
1967}