vector/sources/socket/
mod.rs

1pub mod tcp;
2pub mod udp;
3#[cfg(unix)]
4mod unix;
5
6use vector_lib::{
7    codecs::decoding::DeserializerConfig,
8    config::{LegacyKey, LogNamespace, log_schema},
9    configurable::configurable_component,
10    lookup::{lookup_v2::OptionalValuePath, owned_value_path},
11};
12use vrl::value::{Kind, kind::Collection};
13
14use crate::{
15    codecs::DecodingConfig,
16    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
17    sources::util::net::TcpSource,
18    tls::MaybeTlsSettings,
19};
20
21/// Configuration for the `socket` source.
22#[configurable_component(source("socket", "Collect logs over a socket."))]
23#[derive(Clone, Debug)]
24pub struct SocketConfig {
25    #[serde(flatten)]
26    pub mode: Mode,
27}
28
29/// Listening mode for the `socket` source.
30#[configurable_component]
31#[derive(Clone, Debug)]
32#[serde(tag = "mode", rename_all = "snake_case")]
33#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
34#[allow(clippy::large_enum_variant)] // just used for configuration
35pub enum Mode {
36    /// Listen on TCP.
37    Tcp(tcp::TcpConfig),
38
39    /// Listen on UDP.
40    Udp(udp::UdpConfig),
41
42    /// Listen on a Unix domain socket (UDS), in datagram mode.
43    #[cfg(unix)]
44    UnixDatagram(unix::UnixConfig),
45
46    /// Listen on a Unix domain socket (UDS), in stream mode.
47    #[cfg(unix)]
48    #[serde(alias = "unix")]
49    UnixStream(unix::UnixConfig),
50}
51
52impl SocketConfig {
53    pub fn new_tcp(tcp_config: tcp::TcpConfig) -> Self {
54        tcp_config.into()
55    }
56
57    pub fn make_basic_tcp_config(addr: std::net::SocketAddr) -> Self {
58        tcp::TcpConfig::from_address(addr.into()).into()
59    }
60
61    fn decoding(&self) -> DeserializerConfig {
62        match &self.mode {
63            Mode::Tcp(config) => config.decoding().clone(),
64            Mode::Udp(config) => config.decoding().clone(),
65            #[cfg(unix)]
66            Mode::UnixDatagram(config) => config.decoding().clone(),
67            #[cfg(unix)]
68            Mode::UnixStream(config) => config.decoding().clone(),
69        }
70    }
71
72    fn log_namespace(&self, global_log_namespace: LogNamespace) -> LogNamespace {
73        match &self.mode {
74            Mode::Tcp(config) => global_log_namespace.merge(config.log_namespace),
75            Mode::Udp(config) => global_log_namespace.merge(config.log_namespace),
76            #[cfg(unix)]
77            Mode::UnixDatagram(config) => global_log_namespace.merge(config.log_namespace),
78            #[cfg(unix)]
79            Mode::UnixStream(config) => global_log_namespace.merge(config.log_namespace),
80        }
81    }
82}
83
84impl From<tcp::TcpConfig> for SocketConfig {
85    fn from(config: tcp::TcpConfig) -> Self {
86        SocketConfig {
87            mode: Mode::Tcp(config),
88        }
89    }
90}
91
92impl From<udp::UdpConfig> for SocketConfig {
93    fn from(config: udp::UdpConfig) -> Self {
94        SocketConfig {
95            mode: Mode::Udp(config),
96        }
97    }
98}
99
100impl GenerateConfig for SocketConfig {
101    fn generate_config() -> toml::Value {
102        toml::from_str(
103            r#"mode = "tcp"
104            address = "0.0.0.0:9000""#,
105        )
106        .unwrap()
107    }
108}
109
110#[async_trait::async_trait]
111#[typetag::serde(name = "socket")]
112impl SourceConfig for SocketConfig {
113    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
114        match self.mode.clone() {
115            Mode::Tcp(config) => {
116                let log_namespace = cx.log_namespace(config.log_namespace);
117
118                let decoding = config.decoding().clone();
119                let decoder = DecodingConfig::new(
120                    config
121                        .framing
122                        .clone()
123                        .unwrap_or_else(|| decoding.default_stream_framing()),
124                    decoding,
125                    log_namespace,
126                )
127                .build()?;
128
129                let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
130                let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
131                let tls_client_metadata_key = config
132                    .tls()
133                    .as_ref()
134                    .and_then(|tls| tls.client_metadata_key.clone())
135                    .and_then(|k| k.path);
136                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
137                tcp.run(
138                    config.address(),
139                    config.keepalive(),
140                    config.shutdown_timeout_secs(),
141                    tls,
142                    tls_client_metadata_key,
143                    config.receive_buffer_bytes(),
144                    config.max_connection_duration_secs(),
145                    cx,
146                    false.into(),
147                    config.connection_limit,
148                    config.permit_origin.map(Into::into),
149                    SocketConfig::NAME,
150                    log_namespace,
151                )
152            }
153            Mode::Udp(config) => {
154                let log_namespace = cx.log_namespace(config.log_namespace);
155                let decoding = config.decoding().clone();
156                let framing = config
157                    .framing()
158                    .clone()
159                    .unwrap_or_else(|| decoding.default_message_based_framing());
160                let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
161                Ok(udp::udp(
162                    config,
163                    decoder,
164                    cx.shutdown,
165                    cx.out,
166                    log_namespace,
167                ))
168            }
169            #[cfg(unix)]
170            Mode::UnixDatagram(config) => {
171                let log_namespace = cx.log_namespace(config.log_namespace);
172                let decoding = config.decoding.clone();
173                let framing = config
174                    .framing
175                    .clone()
176                    .unwrap_or_else(|| decoding.default_message_based_framing());
177                let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
178
179                unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace)
180            }
181            #[cfg(unix)]
182            Mode::UnixStream(config) => {
183                let log_namespace = cx.log_namespace(config.log_namespace);
184
185                let decoding = config.decoding().clone();
186                let decoder = DecodingConfig::new(
187                    config
188                        .framing
189                        .clone()
190                        .unwrap_or_else(|| decoding.default_stream_framing()),
191                    decoding,
192                    log_namespace,
193                )
194                .build()?;
195
196                unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
197            }
198        }
199    }
200
201    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
202        let log_namespace = self.log_namespace(global_log_namespace);
203
204        let schema_definition = self
205            .decoding()
206            .schema_definition(log_namespace)
207            .with_standard_vector_source_metadata();
208
209        let schema_definition = match &self.mode {
210            Mode::Tcp(config) => {
211                let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
212
213                let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
214
215                let tls_client_metadata_path = config
216                    .tls()
217                    .as_ref()
218                    .and_then(|tls| tls.client_metadata_key.as_ref())
219                    .and_then(|k| k.path.clone())
220                    .map(LegacyKey::Overwrite);
221
222                schema_definition
223                    .with_source_metadata(
224                        Self::NAME,
225                        legacy_host_key,
226                        &owned_value_path!("host"),
227                        Kind::bytes(),
228                        Some("host"),
229                    )
230                    .with_source_metadata(
231                        Self::NAME,
232                        legacy_port_key,
233                        &owned_value_path!("port"),
234                        Kind::integer(),
235                        None,
236                    )
237                    .with_source_metadata(
238                        Self::NAME,
239                        tls_client_metadata_path,
240                        &owned_value_path!("tls_client_metadata"),
241                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
242                            .or_undefined(),
243                        None,
244                    )
245            }
246            Mode::Udp(config) => {
247                let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
248
249                let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
250
251                schema_definition
252                    .with_source_metadata(
253                        Self::NAME,
254                        legacy_host_key,
255                        &owned_value_path!("host"),
256                        Kind::bytes(),
257                        None,
258                    )
259                    .with_source_metadata(
260                        Self::NAME,
261                        legacy_port_key,
262                        &owned_value_path!("port"),
263                        Kind::integer(),
264                        None,
265                    )
266            }
267            #[cfg(unix)]
268            Mode::UnixDatagram(config) => {
269                let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
270
271                schema_definition.with_source_metadata(
272                    Self::NAME,
273                    legacy_host_key,
274                    &owned_value_path!("host"),
275                    Kind::bytes(),
276                    None,
277                )
278            }
279            #[cfg(unix)]
280            Mode::UnixStream(config) => {
281                let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
282
283                schema_definition.with_source_metadata(
284                    Self::NAME,
285                    legacy_host_key,
286                    &owned_value_path!("host"),
287                    Kind::bytes(),
288                    None,
289                )
290            }
291        };
292
293        vec![SourceOutput::new_maybe_logs(
294            self.decoding().output_type(),
295            schema_definition,
296        )]
297    }
298
299    fn resources(&self) -> Vec<Resource> {
300        match self.mode.clone() {
301            Mode::Tcp(tcp) => vec![tcp.address().as_tcp_resource()],
302            Mode::Udp(udp) => vec![udp.address().as_udp_resource()],
303            #[cfg(unix)]
304            Mode::UnixDatagram(_) => vec![],
305            #[cfg(unix)]
306            Mode::UnixStream(_) => vec![],
307        }
308    }
309
310    fn can_acknowledge(&self) -> bool {
311        false
312    }
313}
314
315pub(crate) fn default_host_key() -> OptionalValuePath {
316    log_schema().host_key().cloned().into()
317}
318
319#[cfg(test)]
320mod test {
321    use std::{
322        collections::HashMap,
323        net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
324        sync::{
325            Arc, LazyLock,
326            atomic::{AtomicBool, Ordering},
327        },
328        thread,
329    };
330
331    use approx::assert_relative_eq;
332    use bytes::{BufMut, Bytes, BytesMut};
333    use futures::{StreamExt, stream};
334    use portpicker::pick_unused_port;
335    use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
336    use serde_json::json;
337    use tokio::{
338        io::AsyncReadExt,
339        net::TcpStream,
340        sync::{Mutex, MutexGuard},
341        task::JoinHandle,
342        time::{Duration, Instant, timeout},
343    };
344    #[cfg(unix)]
345    use vector_lib::codecs::{
346        CharacterDelimitedDecoderConfig, decoding::CharacterDelimitedDecoderOptions,
347    };
348    use vector_lib::{
349        codecs::{GelfDeserializerConfig, NewlineDelimitedDecoderConfig},
350        event::EventContainer,
351        lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
352    };
353    use vrl::{btreemap, value, value::ObjectMap};
354    #[cfg(unix)]
355    use {
356        super::{Mode, unix::UnixConfig},
357        crate::sources::util::unix::UNNAMED_SOCKET_HOST,
358        crate::test_util::wait_for,
359        futures::{SinkExt, Stream},
360        std::future::ready,
361        std::os::unix::fs::PermissionsExt,
362        std::path::PathBuf,
363        tokio::{
364            io::AsyncWriteExt,
365            net::{UnixDatagram, UnixStream},
366            task::yield_now,
367        },
368        tokio_util::codec::{FramedWrite, LinesCodec},
369    };
370
371    use super::{SocketConfig, tcp::TcpConfig, udp::UdpConfig};
372    use crate::{
373        SourceSender,
374        config::{ComponentKey, GlobalOptions, SourceConfig, SourceContext, log_schema},
375        event::{Event, LogEvent},
376        shutdown::{ShutdownSignal, SourceShutdownCoordinator},
377        sinks::util::tcp::TcpSinkConfig,
378        sources::util::net::SocketListenAddr,
379        test_util::{
380            collect_n, collect_n_limited,
381            components::{
382                COMPONENT_ERROR_TAGS, SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance,
383                assert_source_error,
384            },
385            random_string, send_lines, send_lines_tls, wait_for_tcp,
386        },
387        tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
388    };
389
390    type Guard<'a> = MutexGuard<'a, ()>;
391
392    async fn wait_for_tcp_and_release<'a>(guard: Guard<'a>, addr: SocketAddr) {
393        wait_for_tcp(addr).await;
394        drop(guard) // Now we're sure the socket was bound by the server and we can release the lock
395    }
396
397    static ADDR_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
398    pub async fn next_addr_for_ip<'a>(ip: IpAddr) -> (Guard<'a>, SocketAddr) {
399        let guard = ADDR_LOCK.lock().await;
400        let port = pick_unused_port(ip);
401        (guard, SocketAddr::new(ip, port))
402    }
403
404    pub async fn next_addr_any<'a>() -> (Guard<'a>, SocketAddr) {
405        next_addr_for_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).await
406    }
407
408    pub async fn next_addr<'a>() -> (Guard<'a>, SocketAddr) {
409        next_addr_for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)).await
410    }
411
412    pub fn bind_unused_udp() -> UdpSocket {
413        portpicker::bind_unused_udp(IpAddr::V4(Ipv4Addr::LOCALHOST))
414    }
415
416    pub fn bind_unused_udp_any() -> UdpSocket {
417        portpicker::bind_unused_udp(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
418    }
419
420    fn get_gelf_payload(message: &str) -> String {
421        serde_json::to_string(&json!({
422            "version": "1.1",
423            "host": "example.org",
424            "short_message": message,
425            "timestamp": 1234567890.123,
426            "level": 6,
427            "_foo": "bar",
428        }))
429        .unwrap()
430    }
431
432    fn create_gelf_chunk(
433        message_id: u64,
434        sequence_number: u8,
435        total_chunks: u8,
436        payload: &[u8],
437    ) -> Bytes {
438        const GELF_MAGIC: [u8; 2] = [0x1e, 0x0f];
439        let mut chunk = BytesMut::new();
440        chunk.put_slice(&GELF_MAGIC);
441        chunk.put_u64(message_id);
442        chunk.put_u8(sequence_number);
443        chunk.put_u8(total_chunks);
444        chunk.put(payload);
445        chunk.freeze()
446    }
447
448    fn get_gelf_chunks(short_message: &str, max_size: usize, rng: &mut SmallRng) -> Vec<Bytes> {
449        let message_id = rand::random();
450        let payload = get_gelf_payload(short_message);
451        let payload_chunks = payload.as_bytes().chunks(max_size).collect::<Vec<_>>();
452        let total_chunks = payload_chunks.len();
453        assert!(total_chunks <= 128, "too many gelf chunks");
454
455        let mut chunks = payload_chunks
456            .into_iter()
457            .enumerate()
458            .map(|(i, payload_chunk)| {
459                create_gelf_chunk(message_id, i as u8, total_chunks as u8, payload_chunk)
460            })
461            .collect::<Vec<_>>();
462        // Shuffle the chunks to simulate out-of-order delivery
463        chunks.shuffle(rng);
464        chunks
465    }
466
467    #[test]
468    fn generate_config() {
469        crate::test_util::test_generate_config::<SocketConfig>();
470    }
471
472    //////// TCP TESTS ////////
473    #[tokio::test]
474    async fn tcp_it_includes_host() {
475        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
476            let (tx, mut rx) = SourceSender::new_test();
477            let (guard, addr) = next_addr().await;
478
479            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
480                .build(SourceContext::new_test(tx, None))
481                .await
482                .unwrap();
483            tokio::spawn(server);
484
485            wait_for_tcp_and_release(guard, addr).await;
486
487            let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
488                .await
489                .unwrap();
490
491            let event = rx.next().await.unwrap();
492
493            assert_eq!(event.as_log()["host"], addr.ip().to_string().into());
494            assert_eq!(event.as_log()["port"], addr.port().into());
495        })
496        .await;
497    }
498
499    #[tokio::test]
500    async fn tcp_it_includes_vector_namespaced_fields() {
501        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
502            let (tx, mut rx) = SourceSender::new_test();
503            let (guard, addr) = next_addr().await;
504            let mut conf = TcpConfig::from_address(addr.into());
505            conf.set_log_namespace(Some(true));
506
507            let server = SocketConfig::from(conf)
508                .build(SourceContext::new_test(tx, None))
509                .await
510                .unwrap();
511            tokio::spawn(server);
512
513            wait_for_tcp_and_release(guard, addr).await;
514
515            let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
516                .await
517                .unwrap();
518
519            let event = rx.next().await.unwrap();
520            let log = event.as_log();
521            let event_meta = log.metadata().value();
522
523            assert_eq!(log.value(), &"test".into());
524            assert_eq!(
525                event_meta.get(path!("vector", "source_type")).unwrap(),
526                &value!(SocketConfig::NAME)
527            );
528            assert_eq!(
529                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
530                &value!(addr.ip().to_string())
531            );
532            assert_eq!(
533                event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
534                &value!(addr.port())
535            );
536        })
537        .await;
538    }
539
540    #[tokio::test]
541    async fn tcp_splits_on_newline() {
542        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
543            let (tx, rx) = SourceSender::new_test();
544            let (guard, addr) = next_addr().await;
545
546            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
547                .build(SourceContext::new_test(tx, None))
548                .await
549                .unwrap();
550            tokio::spawn(server);
551
552            wait_for_tcp_and_release(guard, addr).await;
553
554            send_lines(addr, vec!["foo\nbar".to_owned()].into_iter())
555                .await
556                .unwrap();
557
558            let events = collect_n(rx, 2).await;
559
560            assert_eq!(events.len(), 2);
561            assert_eq!(
562                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
563                "foo".into()
564            );
565            assert_eq!(
566                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
567                "bar".into()
568            );
569        })
570        .await;
571    }
572
573    #[tokio::test]
574    async fn tcp_it_includes_source_type() {
575        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
576            let (tx, mut rx) = SourceSender::new_test();
577            let (guard, addr) = next_addr().await;
578
579            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
580                .build(SourceContext::new_test(tx, None))
581                .await
582                .unwrap();
583            tokio::spawn(server);
584
585            wait_for_tcp_and_release(guard, addr).await;
586            send_lines(addr, vec!["test".to_owned()].into_iter())
587                .await
588                .unwrap();
589
590            let event = rx.next().await.unwrap();
591            assert_eq!(
592                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
593                "socket".into()
594            );
595        })
596        .await;
597    }
598
599    #[tokio::test]
600    async fn tcp_continue_after_long_line() {
601        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
602            let (tx, mut rx) = SourceSender::new_test();
603            let (guard, addr) = next_addr().await;
604
605            let mut config = TcpConfig::from_address(addr.into());
606            config.set_framing(Some(
607                NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
608            ));
609
610            let server = SocketConfig::from(config)
611                .build(SourceContext::new_test(tx, None))
612                .await
613                .unwrap();
614            tokio::spawn(server);
615
616            let lines = vec![
617                "short".to_owned(),
618                "this is too long".to_owned(),
619                "more short".to_owned(),
620            ];
621
622            wait_for_tcp_and_release(guard, addr).await;
623            send_lines(addr, lines.into_iter()).await.unwrap();
624
625            let event = rx.next().await.unwrap();
626            assert_eq!(
627                event.as_log()[log_schema().message_key().unwrap().to_string()],
628                "short".into()
629            );
630
631            let event = rx.next().await.unwrap();
632            assert_eq!(
633                event.as_log()[log_schema().message_key().unwrap().to_string()],
634                "more short".into()
635            );
636        })
637        .await;
638    }
639
640    #[tokio::test]
641    async fn tcp_with_tls() {
642        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
643            let (tx, mut rx) = SourceSender::new_test();
644            let (guard, addr) = next_addr().await;
645
646            let mut config = TcpConfig::from_address(addr.into());
647            config.set_tls(Some(TlsSourceConfig {
648                tls_config: TlsEnableableConfig {
649                    enabled: Some(true),
650                    options: TlsConfig {
651                        verify_certificate: Some(true),
652                        crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
653                        key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
654                        ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
655                        ..Default::default()
656                    },
657                },
658                client_metadata_key: Some(OptionalValuePath::from(owned_value_path!("tls_peer"))),
659            }));
660
661            let server = SocketConfig::from(config)
662                .build(SourceContext::new_test(tx, None))
663                .await
664                .unwrap();
665            tokio::spawn(server);
666
667            let lines = vec!["one line".to_owned(), "another line".to_owned()];
668
669            wait_for_tcp_and_release(guard, addr).await;
670            send_lines_tls(
671                addr,
672                "localhost".into(),
673                lines.into_iter(),
674                std::path::Path::new(tls::TEST_PEM_CA_PATH),
675                std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
676                std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
677            )
678            .await
679            .unwrap();
680
681            let event = rx.next().await.unwrap();
682            assert_eq!(
683                event.as_log()[log_schema().message_key().unwrap().to_string()],
684                "one line".into()
685            );
686
687            let tls_meta: ObjectMap = btreemap!(
688                "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
689            );
690
691            assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
692
693            let event = rx.next().await.unwrap();
694            assert_eq!(
695                event.as_log()[log_schema().message_key().unwrap().to_string()],
696                "another line".into()
697            );
698
699            assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
700        })
701        .await;
702    }
703
704    #[tokio::test]
705    async fn tcp_with_tls_vector_namespace() {
706        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
707            let (tx, mut rx) = SourceSender::new_test();
708            let (guard, addr) = next_addr().await;
709
710            let mut config = TcpConfig::from_address(addr.into());
711            config.set_tls(Some(TlsSourceConfig {
712                tls_config: TlsEnableableConfig {
713                    enabled: Some(true),
714                    options: TlsConfig {
715                        verify_certificate: Some(true),
716                        crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
717                        key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
718                        ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
719                        ..Default::default()
720                    },
721                },
722                client_metadata_key: None,
723            }));
724            config.log_namespace = Some(true);
725
726            let server = SocketConfig::from(config)
727                .build(SourceContext::new_test(tx, None))
728                .await
729                .unwrap();
730            tokio::spawn(server);
731
732            let lines = vec!["one line".to_owned(), "another line".to_owned()];
733
734            wait_for_tcp_and_release(guard, addr).await;
735            send_lines_tls(
736                addr,
737                "localhost".into(),
738                lines.into_iter(),
739                std::path::Path::new(tls::TEST_PEM_CA_PATH),
740                std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
741                std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
742            )
743            .await
744            .unwrap();
745
746            let event = rx.next().await.unwrap();
747            let log = event.as_log();
748            let event_meta = log.metadata().value();
749
750            assert_eq!(log.value(), &"one line".into());
751
752            let tls_meta: ObjectMap = btreemap!(
753                "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
754            );
755
756            assert_eq!(
757                event_meta
758                    .get(path!(SocketConfig::NAME, "tls_client_metadata"))
759                    .unwrap(),
760                &value!(tls_meta.clone())
761            );
762
763            let event = rx.next().await.unwrap();
764            let log = event.as_log();
765            let event_meta = log.metadata().value();
766
767            assert_eq!(log.value(), &"another line".into());
768
769            assert_eq!(
770                event_meta
771                    .get(path!(SocketConfig::NAME, "tls_client_metadata"))
772                    .unwrap(),
773                &value!(tls_meta.clone())
774            );
775        })
776        .await;
777    }
778
779    #[tokio::test]
780    async fn tcp_shutdown_simple() {
781        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
782            let source_id = ComponentKey::from("tcp_shutdown_simple");
783            let (tx, mut rx) = SourceSender::new_test();
784            let (guard, addr) = next_addr().await;
785            let (cx, mut shutdown) = SourceContext::new_shutdown(&source_id, tx);
786
787            // Start TCP Source
788            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
789                .build(cx)
790                .await
791                .unwrap();
792            let source_handle = tokio::spawn(server);
793
794            // Send data to Source.
795            wait_for_tcp_and_release(guard, addr).await;
796            send_lines(addr, vec!["test".to_owned()].into_iter())
797                .await
798                .unwrap();
799
800            let event = rx.next().await.unwrap();
801            assert_eq!(
802                event.as_log()[log_schema().message_key().unwrap().to_string()],
803                "test".into()
804            );
805
806            // Now signal to the Source to shut down.
807            let deadline = Instant::now() + Duration::from_secs(10);
808            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
809            let shutdown_success = shutdown_complete.await;
810            assert!(shutdown_success);
811
812            // Ensure source actually shut down successfully.
813            _ = source_handle.await.unwrap();
814        })
815        .await;
816    }
817
818    // Intentially not using assert_source_compliance here because this is a round-trip test which
819    // means source and sink will both emit `EventsSent` , triggering multi-emission check.
820    #[tokio::test]
821    async fn tcp_shutdown_infinite_stream() {
822        // We create our TCP source with a larger-than-normal send buffer, which helps ensure that
823        // the source doesn't block on sending the events downstream, otherwise if it was blocked on
824        // doing so, it wouldn't be able to wake up and loop to see that it had been signalled to
825        // shutdown.
826        let (guard, addr) = next_addr().await;
827
828        let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000);
829        let source_key = ComponentKey::from("tcp_shutdown_infinite_stream");
830        let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx);
831
832        let mut source_config = TcpConfig::from_address(addr.into());
833        source_config.set_shutdown_timeout_secs(1);
834        let source_task = SocketConfig::from(source_config)
835            .build(source_cx)
836            .await
837            .unwrap();
838
839        // Spawn the source task and wait until we're sure it's listening:
840        let source_handle = tokio::spawn(source_task);
841        wait_for_tcp_and_release(guard, addr).await;
842
843        // Now we create a TCP _sink_ which we'll feed with an infinite stream of events to ship to
844        // our TCP source.  This will ensure that our TCP source is fully-loaded as we try to shut
845        // it down, exercising the logic we have to ensure timely shutdown even under load:
846        let message = random_string(512);
847        let message_bytes = Bytes::from(message.clone());
848
849        #[derive(Clone, Debug)]
850        struct Serializer {
851            bytes: Bytes,
852        }
853        impl tokio_util::codec::Encoder<Event> for Serializer {
854            type Error = vector_lib::codecs::encoding::Error;
855
856            fn encode(&mut self, _: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
857                buffer.put(self.bytes.as_ref());
858                buffer.put_u8(b'\n');
859                Ok(())
860            }
861        }
862        let sink_config = TcpSinkConfig::from_address(format!("localhost:{}", addr.port()));
863        let encoder = Serializer {
864            bytes: message_bytes,
865        };
866        let (sink, _healthcheck) = sink_config.build(Default::default(), encoder).unwrap();
867
868        tokio::spawn(async move {
869            let input = stream::repeat_with(|| LogEvent::default().into()).boxed();
870            sink.run(input).await.unwrap();
871        });
872
873        // Now with our sink running, feeding events to the source, collect 100 event arrays from
874        // the source and make sure each event within them matches the single message we repeatedly
875        // sent via the sink:
876        let events = collect_n_limited(source_rx, 100)
877            .await
878            .into_iter()
879            .collect::<Vec<_>>();
880        assert_eq!(100, events.len());
881
882        let message_key = log_schema().message_key().unwrap().to_string();
883        let expected_message = message.clone().into();
884        for event in events.into_iter().flat_map(EventContainer::into_events) {
885            assert_eq!(event.as_log()[message_key.as_str()], expected_message);
886        }
887
888        // Now trigger shutdown on the source and ensure that it shuts down before or at the
889        // deadline, and make sure the source task actually finished as well:
890        let shutdown_timeout_limit = Duration::from_secs(10);
891        let deadline = Instant::now() + shutdown_timeout_limit;
892        let shutdown_complete = shutdown.shutdown_source(&source_key, deadline);
893
894        let shutdown_result = timeout(shutdown_timeout_limit, shutdown_complete).await;
895        assert_eq!(shutdown_result, Ok(true));
896
897        let source_result = source_handle.await.expect("source task should not panic");
898        assert_eq!(source_result, Ok(()));
899    }
900
901    #[tokio::test]
902    async fn tcp_connection_close_after_max_duration() {
903        let (tx, _) = SourceSender::new_test();
904        let (guard, addr) = next_addr().await;
905
906        let mut source_config = TcpConfig::from_address(addr.into());
907        source_config.set_max_connection_duration_secs(Some(1));
908        let source_task = SocketConfig::from(source_config)
909            .build(SourceContext::new_test(tx, None))
910            .await
911            .unwrap();
912
913        // Spawn the source task and wait until we're sure it's listening:
914        drop(tokio::spawn(source_task));
915        wait_for_tcp_and_release(guard, addr).await;
916
917        let mut stream: TcpStream = TcpStream::connect(addr)
918            .await
919            .expect("stream should be able to connect");
920        let start = Instant::now();
921
922        let timeout = tokio::time::sleep(Duration::from_millis(1200));
923        let mut buffer = [0u8; 10];
924
925        tokio::select! {
926             _ = timeout => {
927                 panic!("timed out waiting for stream to close")
928             },
929             read_result = stream.read(&mut buffer) => {
930                 match read_result {
931                    // read resulting with 0 bytes -> the connection was closed
932                    Ok(0) => assert_relative_eq!(start.elapsed().as_secs_f64(), 1.0, epsilon = 0.3),
933                    Ok(_) => panic!("unexpectedly read data from stream"),
934                    Err(e) => panic!("{e:}")
935                 }
936             }
937        }
938    }
939
940    //////// UDP TESTS ////////
941    async fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> UdpSocket {
942        send_lines_udp_from(bind_unused_udp(), to, lines)
943    }
944
945    fn send_lines_udp_from(
946        from: UdpSocket,
947        to: SocketAddr,
948        lines: impl IntoIterator<Item = String>,
949    ) -> UdpSocket {
950        send_packets_udp_from(from, to, lines.into_iter().map(|line| line.into()))
951    }
952
953    async fn send_packets_udp(
954        to: SocketAddr,
955        packets: impl IntoIterator<Item = Bytes>,
956    ) -> UdpSocket {
957        send_packets_udp_from(bind_unused_udp(), to, packets)
958    }
959
960    fn send_packets_udp_from(
961        from: UdpSocket,
962        to: SocketAddr,
963        packets: impl IntoIterator<Item = Bytes>,
964    ) -> UdpSocket {
965        for packet in packets {
966            assert_eq!(
967                from.send_to(&packet, to)
968                    .map_err(|error| panic!("{error:}"))
969                    .ok()
970                    .unwrap(),
971                packet.len()
972            );
973            // Space things out slightly to try to avoid dropped packets
974            thread::sleep(Duration::from_millis(1));
975        }
976
977        // Give packets some time to flow through
978        thread::sleep(Duration::from_millis(10));
979
980        // Done
981        from
982    }
983
984    async fn init_udp_with_shutdown(
985        sender: SourceSender,
986        source_id: &ComponentKey,
987        shutdown: &mut SourceShutdownCoordinator,
988    ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
989        let (shutdown_signal, _) = shutdown.register_source(source_id, false);
990        init_udp_inner(sender, source_id, shutdown_signal, None, false).await
991    }
992
993    async fn init_udp(sender: SourceSender, use_log_namespace: bool) -> SocketAddr {
994        init_udp_inner(
995            sender,
996            &ComponentKey::from("default"),
997            ShutdownSignal::noop(),
998            None,
999            use_log_namespace,
1000        )
1001        .await
1002        .0
1003    }
1004
1005    async fn init_udp_with_config(sender: SourceSender, config: UdpConfig) -> SocketAddr {
1006        init_udp_inner(
1007            sender,
1008            &ComponentKey::from("default"),
1009            ShutdownSignal::noop(),
1010            Some(config),
1011            false,
1012        )
1013        .await
1014        .0
1015    }
1016
1017    async fn init_udp_inner(
1018        sender: SourceSender,
1019        source_key: &ComponentKey,
1020        shutdown_signal: ShutdownSignal,
1021        config: Option<UdpConfig>,
1022        use_vector_namespace: bool,
1023    ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
1024        let (guard, address, mut config) = match config {
1025            Some(config) => match config.address() {
1026                SocketListenAddr::SocketAddr(addr) => (None, addr, config),
1027                _ => panic!("listen address should not be systemd FD offset in tests"),
1028            },
1029            None => {
1030                let (guard, address) = next_addr().await;
1031                (
1032                    Some(guard),
1033                    address,
1034                    UdpConfig::from_address(address.into()),
1035                )
1036            }
1037        };
1038
1039        let config = if use_vector_namespace {
1040            config.set_log_namespace(Some(true));
1041            config
1042        } else {
1043            config
1044        };
1045
1046        let server = SocketConfig::from(config)
1047            .build(SourceContext {
1048                key: source_key.clone(),
1049                globals: GlobalOptions::default(),
1050                enrichment_tables: Default::default(),
1051                shutdown: shutdown_signal,
1052                out: sender,
1053                proxy: Default::default(),
1054                acknowledgements: false,
1055                schema: Default::default(),
1056                schema_definitions: HashMap::default(),
1057                extra_context: Default::default(),
1058            })
1059            .await
1060            .unwrap();
1061        let source_handle = tokio::spawn(server);
1062
1063        // Wait for UDP to start listening
1064        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1065
1066        if let Some(guard) = guard {
1067            drop(guard)
1068        }
1069
1070        (address, source_handle)
1071    }
1072
1073    #[tokio::test]
1074    async fn udp_message() {
1075        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1076            let (tx, rx) = SourceSender::new_test();
1077            let address = init_udp(tx, false).await;
1078
1079            send_lines_udp(address, vec!["test".to_string()]).await;
1080            let events = collect_n(rx, 1).await;
1081
1082            assert_eq!(
1083                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1084                "test".into()
1085            );
1086        })
1087        .await;
1088    }
1089
1090    #[tokio::test]
1091    async fn udp_message_preserves_newline() {
1092        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1093            let (tx, rx) = SourceSender::new_test();
1094            let address = init_udp(tx, false).await;
1095
1096            send_lines_udp(address, vec!["foo\nbar".to_string()]).await;
1097            let events = collect_n(rx, 1).await;
1098
1099            assert_eq!(
1100                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1101                "foo\nbar".into()
1102            );
1103        })
1104        .await;
1105    }
1106
1107    #[tokio::test]
1108    async fn udp_multiple_packets() {
1109        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1110            let (tx, rx) = SourceSender::new_test();
1111            let address = init_udp(tx, false).await;
1112
1113            send_lines_udp(address, vec!["test".to_string(), "test2".to_string()]).await;
1114            let events = collect_n(rx, 2).await;
1115
1116            assert_eq!(
1117                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1118                "test".into()
1119            );
1120            assert_eq!(
1121                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1122                "test2".into()
1123            );
1124        })
1125        .await;
1126    }
1127
1128    #[tokio::test]
1129    async fn udp_max_length() {
1130        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1131            let (tx, rx) = SourceSender::new_test();
1132            let (_, address) = next_addr().await;
1133            let mut config = UdpConfig::from_address(address.into());
1134            config.max_length = 11;
1135            let address = init_udp_with_config(tx, config).await;
1136
1137            send_lines_udp(
1138                address,
1139                vec![
1140                    "short line".to_string(),
1141                    "test with a long line".to_string(),
1142                    "a short un".to_string(),
1143                ],
1144            )
1145            .await;
1146
1147            let events = collect_n(rx, 2).await;
1148            assert_eq!(
1149                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1150                "short line".into()
1151            );
1152            assert_eq!(
1153                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1154                "a short un".into()
1155            );
1156        })
1157        .await;
1158    }
1159
1160    #[cfg(unix)]
1161    #[tokio::test]
1162    /// This test only works on Unix.
1163    /// Unix truncates at max_length giving us the bytes to get the first n delimited messages.
1164    /// Windows will drop the entire packet if we exceed the max_length so we are unable to
1165    /// extract anything.
1166    async fn udp_max_length_delimited() {
1167        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1168            let (tx, rx) = SourceSender::new_test();
1169            let (_, address) = next_addr().await;
1170            let mut config = UdpConfig::from_address(address.into());
1171            config.max_length = 10;
1172            config.framing = Some(
1173                CharacterDelimitedDecoderConfig {
1174                    character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
1175                }
1176                .into(),
1177            );
1178            let address = init_udp_with_config(tx, config).await;
1179
1180            send_lines_udp(
1181                address,
1182                vec!["test with, long line".to_string(), "short one".to_string()],
1183            )
1184            .await;
1185
1186            let events = collect_n(rx, 2).await;
1187            assert_eq!(
1188                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1189                "test with".into()
1190            );
1191            assert_eq!(
1192                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1193                "short one".into()
1194            );
1195        })
1196        .await;
1197    }
1198
1199    #[tokio::test]
1200    async fn udp_decodes_chunked_gelf_messages() {
1201        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1202            let (tx, rx) = SourceSender::new_test();
1203            let (_, address) = next_addr().await;
1204            let mut config = UdpConfig::from_address(address.into());
1205            config.decoding = GelfDeserializerConfig::default().into();
1206            let address = init_udp_with_config(tx, config).await;
1207            let seed = 42;
1208            let mut rng = SmallRng::seed_from_u64(seed);
1209            let max_size = 300;
1210            let big_message = "This is a very large message".repeat(500);
1211            let another_big_message = "This is another very large message".repeat(500);
1212            let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1213            let mut another_chunks =
1214                get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1215            chunks.append(&mut another_chunks);
1216            chunks.shuffle(&mut rng);
1217
1218            send_packets_udp(address, chunks).await;
1219
1220            let events = collect_n(rx, 2).await;
1221            assert_eq!(
1222                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1223                big_message.into()
1224            );
1225            assert_eq!(
1226                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1227                another_big_message.into()
1228            );
1229        })
1230        .await;
1231    }
1232
1233    #[tokio::test]
1234    async fn udp_it_includes_host() {
1235        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1236            let (tx, rx) = SourceSender::new_test();
1237            let address = init_udp(tx, false).await;
1238
1239            let from = send_lines_udp(address, vec!["test".to_string()]).await;
1240            let events = collect_n(rx, 1).await;
1241
1242            assert_eq!(
1243                events[0].as_log()["host"],
1244                from.local_addr().unwrap().ip().to_string().into()
1245            );
1246            assert_eq!(
1247                events[0].as_log()["port"],
1248                from.local_addr().unwrap().port().into()
1249            );
1250        })
1251        .await;
1252    }
1253
1254    #[tokio::test]
1255    async fn udp_it_includes_vector_namespaced_fields() {
1256        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1257            let (tx, rx) = SourceSender::new_test();
1258            let address = init_udp(tx, true).await;
1259
1260            let from = send_lines_udp(address, vec!["test".to_string()]).await;
1261            let events = collect_n(rx, 1).await;
1262            let log = events[0].as_log();
1263            let event_meta = log.metadata().value();
1264
1265            assert_eq!(log.value(), &"test".into());
1266            assert_eq!(
1267                event_meta.get(path!("vector", "source_type")).unwrap(),
1268                &value!(SocketConfig::NAME)
1269            );
1270            assert_eq!(
1271                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1272                &value!(from.local_addr().unwrap().ip().to_string())
1273            );
1274            assert_eq!(
1275                event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
1276                &value!(from.local_addr().unwrap().port())
1277            );
1278        })
1279        .await;
1280    }
1281
1282    #[tokio::test]
1283    async fn udp_it_includes_source_type() {
1284        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1285            let (tx, rx) = SourceSender::new_test();
1286            let address = init_udp(tx, false).await;
1287
1288            _ = send_lines_udp(address, vec!["test".to_string()]).await;
1289            let events = collect_n(rx, 1).await;
1290
1291            assert_eq!(
1292                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1293                "socket".into()
1294            );
1295        })
1296        .await;
1297    }
1298
1299    #[tokio::test]
1300    async fn udp_shutdown_simple() {
1301        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1302            let (tx, rx) = SourceSender::new_test();
1303            let source_id = ComponentKey::from("udp_shutdown_simple");
1304
1305            let mut shutdown = SourceShutdownCoordinator::default();
1306            let (address, source_handle) =
1307                init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1308
1309            send_lines_udp(address, vec!["test".to_string()]).await;
1310            let events = collect_n(rx, 1).await;
1311
1312            assert_eq!(
1313                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1314                "test".into()
1315            );
1316
1317            // Now signal to the Source to shut down.
1318            let deadline = Instant::now() + Duration::from_secs(10);
1319            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1320            let shutdown_success = shutdown_complete.await;
1321            assert!(shutdown_success);
1322
1323            // Ensure source actually shut down successfully.
1324            _ = source_handle.await.unwrap();
1325        })
1326        .await;
1327    }
1328
1329    #[tokio::test]
1330    async fn udp_shutdown_infinite_stream() {
1331        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1332            let (tx, rx) = SourceSender::new_test();
1333            let source_id = ComponentKey::from("udp_shutdown_infinite_stream");
1334
1335            let mut shutdown = SourceShutdownCoordinator::default();
1336            let (address, source_handle) =
1337                init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1338
1339            // Stream that keeps sending lines to the UDP source forever.
1340            let run_pump_atomic_sender = Arc::new(AtomicBool::new(true));
1341            let run_pump_atomic_receiver = Arc::clone(&run_pump_atomic_sender);
1342            let pump_handle = tokio::task::spawn_blocking(move || {
1343                let handle = tokio::runtime::Handle::current();
1344                handle.block_on(send_lines_udp(
1345                    address,
1346                    std::iter::repeat("test".to_string())
1347                        .take_while(move |_| run_pump_atomic_receiver.load(Ordering::Relaxed)),
1348                ));
1349            });
1350
1351            // Important that 'rx' doesn't get dropped until the pump has finished sending items to it.
1352            let events = collect_n(rx, 100).await;
1353            assert_eq!(100, events.len());
1354            for event in events {
1355                assert_eq!(
1356                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1357                    "test".into()
1358                );
1359            }
1360
1361            let deadline = Instant::now() + Duration::from_secs(10);
1362            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1363            let shutdown_success = shutdown_complete.await;
1364            assert!(shutdown_success);
1365
1366            // Ensure that the source has actually shut down.
1367            _ = source_handle.await.unwrap();
1368
1369            // Stop the pump from sending lines forever.
1370            run_pump_atomic_sender.store(false, Ordering::Relaxed);
1371            assert!(pump_handle.await.is_ok());
1372        })
1373        .await;
1374    }
1375
1376    #[tokio::test]
1377    async fn multicast_udp_message() {
1378        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1379            let (tx, mut rx) = SourceSender::new_test();
1380            // The socket address must be `IPADDR_ANY` (0.0.0.0) in order to receive multicast packets
1381            let (guard, socket_address) = next_addr_any().await;
1382            let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1383            let multicast_socket_address =
1384                SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1385            let mut config = UdpConfig::from_address(socket_address.into());
1386            config.multicast_groups = vec![multicast_ip_address];
1387            init_udp_with_config(tx, config).await;
1388            drop(guard);
1389
1390            // We must send packets to the same interface the `socket_address` is bound to
1391            // in order to receive the multicast packets the `from` socket sends.
1392            // To do so, we use the `IPADDR_ANY` address
1393            send_lines_udp_from(
1394                bind_unused_udp_any(),
1395                multicast_socket_address,
1396                ["test".to_string()],
1397            );
1398
1399            let event = rx.next().await.expect("must receive an event");
1400            assert_eq!(
1401                event.as_log()[log_schema().message_key().unwrap().to_string()],
1402                "test".into()
1403            );
1404        })
1405        .await;
1406    }
1407
1408    #[tokio::test]
1409    async fn multiple_multicast_addresses_udp_message() {
1410        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1411            let (tx, mut rx) = SourceSender::new_test();
1412            let (guard, socket_address) = next_addr_any().await;
1413            let multicast_ip_addresses = (2..12)
1414                .map(|i| format!("224.0.0.{i}").parse().unwrap())
1415                .collect::<Vec<Ipv4Addr>>();
1416            let multicast_ip_socket_addresses = multicast_ip_addresses
1417                .iter()
1418                .map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
1419                .collect::<Vec<SocketAddr>>();
1420            let mut config = UdpConfig::from_address(socket_address.into());
1421            config.multicast_groups = multicast_ip_addresses;
1422            init_udp_with_config(tx, config).await;
1423            drop(guard);
1424
1425            let mut from = bind_unused_udp_any();
1426            for multicast_ip_socket_address in multicast_ip_socket_addresses {
1427                from = send_lines_udp_from(
1428                    from,
1429                    multicast_ip_socket_address,
1430                    [multicast_ip_socket_address.to_string()],
1431                );
1432
1433                let event = rx.next().await.expect("must receive an event");
1434                assert_eq!(
1435                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1436                    multicast_ip_socket_address.to_string().into()
1437                );
1438            }
1439        })
1440        .await;
1441    }
1442
1443    #[tokio::test]
1444    async fn multicast_and_unicast_udp_message() {
1445        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1446            let (tx, mut rx) = SourceSender::new_test();
1447            let (guard, socket_address) = next_addr_any().await;
1448            let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1449            let multicast_socket_address =
1450                SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1451            let mut config = UdpConfig::from_address(socket_address.into());
1452            config.multicast_groups = vec![multicast_ip_address];
1453            init_udp_with_config(tx, config).await;
1454            drop(guard);
1455
1456            let from = bind_unused_udp_any();
1457            // Send packet to multicast address
1458            let from = send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
1459            let event = rx.next().await.expect("must receive an event");
1460            assert_eq!(
1461                event.as_log()[log_schema().message_key().unwrap().to_string()],
1462                "test".into()
1463            );
1464
1465            // Windows does not support connecting to `0.0.0.0`,
1466            // therefore we connect to `127.0.0.1` instead (the socket is listening at `0.0.0.0`)
1467            let to = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket_address.port());
1468            // Send packet to unicast address
1469            send_lines_udp_from(from, to, ["test".to_string()]);
1470            let event = rx.next().await.expect("must receive an event");
1471            assert_eq!(
1472                event.as_log()[log_schema().message_key().unwrap().to_string()],
1473                "test".into()
1474            );
1475        })
1476        .await;
1477    }
1478
1479    #[tokio::test]
1480    async fn udp_invalid_multicast_group() {
1481        assert_source_error(&COMPONENT_ERROR_TAGS, async {
1482            let (tx, _rx) = SourceSender::new_test();
1483            let (_, socket_address) = next_addr_any().await;
1484            let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
1485            let mut config = UdpConfig::from_address(socket_address.into());
1486            config.multicast_groups = vec![invalid_multicast_ip_address];
1487            init_udp_with_config(tx, config).await;
1488        })
1489        .await;
1490    }
1491
1492    ////////////// UNIX TEST LIBS //////////////
1493
1494    #[cfg(unix)]
1495    async fn init_unix(sender: SourceSender, stream: bool, use_vector_namespace: bool) -> PathBuf {
1496        init_unix_inner(sender, stream, use_vector_namespace, None).await
1497    }
1498
1499    #[cfg(unix)]
1500    async fn init_unix_with_config(
1501        sender: SourceSender,
1502        stream: bool,
1503        use_vector_namespace: bool,
1504        config: UnixConfig,
1505    ) -> PathBuf {
1506        init_unix_inner(sender, stream, use_vector_namespace, Some(config)).await
1507    }
1508
1509    #[cfg(unix)]
1510    async fn init_unix_inner(
1511        sender: SourceSender,
1512        stream: bool,
1513        use_vector_namespace: bool,
1514        config: Option<UnixConfig>,
1515    ) -> PathBuf {
1516        let mut config = config.unwrap_or_else(|| {
1517            UnixConfig::new(tempfile::tempdir().unwrap().keep().join("unix_test"))
1518        });
1519
1520        let in_path = config.path.clone();
1521
1522        if use_vector_namespace {
1523            config.log_namespace = Some(true);
1524        }
1525
1526        let mode = if stream {
1527            Mode::UnixStream(config)
1528        } else {
1529            Mode::UnixDatagram(config)
1530        };
1531
1532        let server = SocketConfig { mode }
1533            .build(SourceContext::new_test(sender, None))
1534            .await
1535            .unwrap();
1536        tokio::spawn(server);
1537
1538        // Wait for server to accept traffic
1539        while if stream {
1540            std::os::unix::net::UnixStream::connect(&in_path).is_err()
1541        } else {
1542            let socket = std::os::unix::net::UnixDatagram::unbound().unwrap();
1543            socket.connect(&in_path).is_err()
1544        } {
1545            yield_now().await;
1546        }
1547
1548        in_path
1549    }
1550
1551    #[cfg(unix)]
1552    async fn unix_send_lines(stream: bool, path: PathBuf, lines: &[&str]) {
1553        match stream {
1554            false => send_lines_unix_datagram(path, lines).await,
1555            true => send_lines_unix_stream(path, lines).await,
1556        }
1557    }
1558
1559    #[cfg(unix)]
1560    async fn unix_message(
1561        message: &str,
1562        stream: bool,
1563        use_vector_namespace: bool,
1564    ) -> (PathBuf, impl Stream<Item = Event> + use<>) {
1565        let (tx, rx) = SourceSender::new_test();
1566        let path = init_unix(tx, stream, use_vector_namespace).await;
1567        let path_clone = path.clone();
1568
1569        unix_send_lines(stream, path, &[message]).await;
1570
1571        (path_clone, rx)
1572    }
1573
1574    #[cfg(unix)]
1575    async fn unix_multiple_packets(stream: bool) {
1576        let (tx, rx) = SourceSender::new_test();
1577        let path = init_unix(tx, stream, false).await;
1578
1579        unix_send_lines(stream, path, &["test", "test2"]).await;
1580        let events = collect_n(rx, 2).await;
1581
1582        assert_eq!(2, events.len());
1583        assert_eq!(
1584            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1585            "test".into()
1586        );
1587        assert_eq!(
1588            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1589            "test2".into()
1590        );
1591    }
1592
1593    #[cfg(unix)]
1594    fn parses_unix_config(mode: &str) -> SocketConfig {
1595        toml::from_str::<SocketConfig>(&format!(
1596            r#"
1597               mode = "{mode}"
1598               path = "/does/not/exist"
1599            "#
1600        ))
1601        .unwrap()
1602    }
1603
1604    #[cfg(unix)]
1605    fn parses_unix_config_file_mode(mode: &str) -> SocketConfig {
1606        toml::from_str::<SocketConfig>(&format!(
1607            r#"
1608               mode = "{mode}"
1609               path = "/does/not/exist"
1610               socket_file_mode = 0o777
1611            "#
1612        ))
1613        .unwrap()
1614    }
1615
1616    ////////////// UNIX DATAGRAM TESTS //////////////
1617    #[cfg(unix)]
1618    async fn send_lines_unix_datagram(path: PathBuf, lines: &[&str]) {
1619        let packets = lines.iter().map(|line| Bytes::from(line.to_string()));
1620        send_packets_unix_datagram(path, packets).await;
1621    }
1622
1623    #[cfg(unix)]
1624    async fn send_packets_unix_datagram(path: PathBuf, packets: impl IntoIterator<Item = Bytes>) {
1625        let socket = UnixDatagram::unbound().unwrap();
1626        socket.connect(path).unwrap();
1627
1628        for packet in packets {
1629            socket.send(&packet).await.unwrap();
1630        }
1631        socket.shutdown(std::net::Shutdown::Both).unwrap();
1632    }
1633
1634    #[cfg(unix)]
1635    #[tokio::test]
1636    async fn unix_datagram_message() {
1637        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1638            let (_, rx) = unix_message("test", false, false).await;
1639            let events = collect_n(rx, 1).await;
1640
1641            assert_eq!(events.len(), 1);
1642            assert_eq!(
1643                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1644                "test".into()
1645            );
1646            assert_eq!(
1647                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1648                "socket".into()
1649            );
1650            assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into());
1651        })
1652        .await;
1653    }
1654
1655    #[ignore]
1656    #[cfg(unix)]
1657    #[tokio::test]
1658    async fn unix_datagram_socket_test() {
1659        // This test is useful for testing the behavior of datagram
1660        // sockets.
1661
1662        use tempfile::tempdir;
1663        use tokio::net::UnixDatagram;
1664
1665        let tmp = tempdir().unwrap();
1666
1667        let tx_path = tmp.path().join("tx");
1668
1669        // Switch this var between "bound" and "unbound" to test
1670        // different types of socket behavior.
1671        let tx_type = "bound";
1672
1673        let tx = if tx_type == "bound" {
1674            UnixDatagram::bind(&tx_path).unwrap()
1675        } else {
1676            UnixDatagram::unbound().unwrap()
1677        };
1678
1679        // The following debug statements showcase some useful info:
1680        // dbg!(tx.local_addr().unwrap());
1681        // dbg!(std::os::unix::prelude::AsRawFd::as_raw_fd(&tx));
1682
1683        // Create another, bound socket
1684        let rx_path = tmp.path().join("rx");
1685        let rx = UnixDatagram::bind(&rx_path).unwrap();
1686
1687        // Connect to the bound socket
1688        tx.connect(&rx_path).unwrap();
1689
1690        // Send to the bound socket
1691        let bytes = b"hello world";
1692        tx.send(bytes).await.unwrap();
1693
1694        let mut buf = vec![0u8; 24];
1695        let (size, _) = rx.recv_from(&mut buf).await.unwrap();
1696
1697        let dgram = &buf[..size];
1698        assert_eq!(dgram, bytes);
1699    }
1700
1701    #[cfg(unix)]
1702    #[tokio::test]
1703    async fn unix_datagram_chunked_gelf_messages() {
1704        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1705            let (tx, rx) = SourceSender::new_test();
1706            let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1707            let mut config = UnixConfig::new(in_path.clone());
1708            config.decoding = GelfDeserializerConfig::default().into();
1709            let path = init_unix_with_config(tx, false, false, config).await;
1710            let seed = 42;
1711            let mut rng = SmallRng::seed_from_u64(seed);
1712            let max_size = 20;
1713            let big_message = "This is a very large message".repeat(5);
1714            let another_big_message = "This is another very large message".repeat(5);
1715            let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1716            let mut another_chunks =
1717                get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1718            chunks.append(&mut another_chunks);
1719            chunks.shuffle(&mut rng);
1720
1721            send_packets_unix_datagram(path, chunks).await;
1722
1723            let events = collect_n(rx, 2).await;
1724            assert_eq!(
1725                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1726                big_message.into()
1727            );
1728            assert_eq!(
1729                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1730                another_big_message.into()
1731            );
1732        })
1733        .await;
1734    }
1735
1736    #[cfg(unix)]
1737    #[tokio::test]
1738    async fn unix_datagram_message_with_vector_namespace() {
1739        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1740            let (_, rx) = unix_message("test", false, true).await;
1741            let events = collect_n(rx, 1).await;
1742            let log = events[0].as_log();
1743            let event_meta = log.metadata().value();
1744
1745            assert_eq!(log.value(), &"test".into());
1746            assert_eq!(events.len(), 1);
1747
1748            assert_eq!(
1749                event_meta.get(path!("vector", "source_type")).unwrap(),
1750                &value!(SocketConfig::NAME)
1751            );
1752
1753            assert_eq!(
1754                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1755                &value!(UNNAMED_SOCKET_HOST)
1756            );
1757        })
1758        .await;
1759    }
1760
1761    #[cfg(unix)]
1762    #[tokio::test]
1763    async fn unix_datagram_message_preserves_newline() {
1764        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1765            let (_, rx) = unix_message("foo\nbar", false, false).await;
1766            let events = collect_n(rx, 1).await;
1767
1768            assert_eq!(events.len(), 1);
1769            assert_eq!(
1770                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1771                "foo\nbar".into()
1772            );
1773            assert_eq!(
1774                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1775                "socket".into()
1776            );
1777        })
1778        .await;
1779    }
1780
1781    #[cfg(unix)]
1782    #[tokio::test]
1783    async fn unix_datagram_multiple_packets() {
1784        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1785            unix_multiple_packets(false).await
1786        })
1787        .await;
1788    }
1789
1790    #[cfg(unix)]
1791    #[test]
1792    fn parses_unix_datagram_config() {
1793        let config = parses_unix_config("unix_datagram");
1794        assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1795    }
1796
1797    #[cfg(unix)]
1798    #[test]
1799    fn parses_unix_datagram_perms() {
1800        let config = parses_unix_config_file_mode("unix_datagram");
1801        assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1802    }
1803
1804    #[cfg(unix)]
1805    #[tokio::test]
1806    async fn unix_datagram_permissions() {
1807        let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1808        let (tx, _) = SourceSender::new_test();
1809
1810        let mut config = UnixConfig::new(in_path.clone());
1811        config.socket_file_mode = Some(0o555);
1812        let mode = Mode::UnixDatagram(config);
1813        let server = SocketConfig { mode }
1814            .build(SourceContext::new_test(tx, None))
1815            .await
1816            .unwrap();
1817        tokio::spawn(server);
1818
1819        wait_for(|| {
1820            match std::fs::metadata(&in_path) {
1821                Ok(meta) => {
1822                    match meta.permissions().mode() {
1823                        // S_IFSOCK   0140000   socket
1824                        0o140555 => ready(true),
1825                        _ => ready(false),
1826                    }
1827                }
1828                Err(_) => ready(false),
1829            }
1830        })
1831        .await;
1832    }
1833
1834    ////////////// UNIX STREAM TESTS //////////////
1835    #[cfg(unix)]
1836    async fn send_lines_unix_stream(path: PathBuf, lines: &[&str]) {
1837        let socket = UnixStream::connect(path).await.unwrap();
1838        let mut sink = FramedWrite::new(socket, LinesCodec::new());
1839
1840        let lines = lines.iter().map(|s| Ok(s.to_string()));
1841        let lines = lines.collect::<Vec<_>>();
1842        sink.send_all(&mut stream::iter(lines)).await.unwrap();
1843
1844        let mut socket = sink.into_inner();
1845        socket.shutdown().await.unwrap();
1846    }
1847
1848    #[cfg(unix)]
1849    #[tokio::test]
1850    async fn unix_stream_message() {
1851        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1852            let (_, rx) = unix_message("test", true, false).await;
1853            let events = collect_n(rx, 1).await;
1854
1855            assert_eq!(1, events.len());
1856            assert_eq!(
1857                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1858                "test".into()
1859            );
1860            assert_eq!(
1861                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1862                "socket".into()
1863            );
1864        })
1865        .await;
1866    }
1867
1868    #[cfg(unix)]
1869    #[tokio::test]
1870    async fn unix_stream_message_with_vector_namespace() {
1871        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1872            let (_, rx) = unix_message("test", true, true).await;
1873            let events = collect_n(rx, 1).await;
1874            let log = events[0].as_log();
1875            let event_meta = log.metadata().value();
1876
1877            assert_eq!(log.value(), &"test".into());
1878            assert_eq!(1, events.len());
1879            assert_eq!(
1880                event_meta.get(path!("vector", "source_type")).unwrap(),
1881                &value!(SocketConfig::NAME)
1882            );
1883            assert_eq!(
1884                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1885                &value!(UNNAMED_SOCKET_HOST)
1886            );
1887        })
1888        .await;
1889    }
1890
1891    #[cfg(unix)]
1892    #[tokio::test]
1893    async fn unix_stream_message_splits_on_newline() {
1894        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1895            let (_, rx) = unix_message("foo\nbar", true, false).await;
1896            let events = collect_n(rx, 2).await;
1897
1898            assert_eq!(events.len(), 2);
1899            assert_eq!(
1900                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1901                "foo".into()
1902            );
1903            assert_eq!(
1904                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1905                "socket".into()
1906            );
1907            assert_eq!(
1908                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1909                "bar".into()
1910            );
1911            assert_eq!(
1912                events[1].as_log()[log_schema().source_type_key().unwrap().to_string()],
1913                "socket".into()
1914            );
1915        })
1916        .await;
1917    }
1918
1919    #[cfg(unix)]
1920    #[tokio::test]
1921    async fn unix_stream_multiple_packets() {
1922        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1923            unix_multiple_packets(true).await
1924        })
1925        .await;
1926    }
1927
1928    #[cfg(unix)]
1929    #[test]
1930    fn parses_new_unix_stream_config() {
1931        let config = parses_unix_config("unix_stream");
1932        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1933    }
1934
1935    #[cfg(unix)]
1936    #[test]
1937    fn parses_new_unix_datagram_perms() {
1938        let config = parses_unix_config_file_mode("unix_stream");
1939        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1940    }
1941
1942    #[cfg(unix)]
1943    #[test]
1944    fn parses_old_unix_stream_config() {
1945        let config = parses_unix_config("unix");
1946        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1947    }
1948
1949    #[cfg(unix)]
1950    #[tokio::test]
1951    async fn unix_stream_permissions() {
1952        let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1953        let (tx, _) = SourceSender::new_test();
1954
1955        let mut config = UnixConfig::new(in_path.clone());
1956        config.socket_file_mode = Some(0o421);
1957        let mode = Mode::UnixStream(config);
1958        let server = SocketConfig { mode }
1959            .build(SourceContext::new_test(tx, None))
1960            .await
1961            .unwrap();
1962        tokio::spawn(server);
1963
1964        wait_for(|| {
1965            match std::fs::metadata(&in_path) {
1966                Ok(meta) => {
1967                    match meta.permissions().mode() {
1968                        // S_IFSOCK   0140000   socket
1969                        0o140421 => ready(true),
1970                        _ => ready(false),
1971                    }
1972                }
1973                Err(_) => ready(false),
1974            }
1975        })
1976        .await;
1977    }
1978}