vector/sources/socket/
mod.rs

1pub mod tcp;
2pub mod udp;
3#[cfg(unix)]
4mod unix;
5
6use vector_lib::codecs::decoding::DeserializerConfig;
7use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
8use vector_lib::configurable::configurable_component;
9use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
10use vrl::value::{kind::Collection, Kind};
11
12use crate::{
13    codecs::DecodingConfig,
14    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
15    sources::util::net::TcpSource,
16    tls::MaybeTlsSettings,
17};
18
19/// Configuration for the `socket` source.
20#[configurable_component(source("socket", "Collect logs over a socket."))]
21#[derive(Clone, Debug)]
22pub struct SocketConfig {
23    #[serde(flatten)]
24    pub mode: Mode,
25}
26
27/// Listening mode for the `socket` source.
28#[configurable_component]
29#[derive(Clone, Debug)]
30#[serde(tag = "mode", rename_all = "snake_case")]
31#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
32#[allow(clippy::large_enum_variant)] // just used for configuration
33pub enum Mode {
34    /// Listen on TCP.
35    Tcp(tcp::TcpConfig),
36
37    /// Listen on UDP.
38    Udp(udp::UdpConfig),
39
40    /// Listen on a Unix domain socket (UDS), in datagram mode.
41    #[cfg(unix)]
42    UnixDatagram(unix::UnixConfig),
43
44    /// Listen on a Unix domain socket (UDS), in stream mode.
45    #[cfg(unix)]
46    #[serde(alias = "unix")]
47    UnixStream(unix::UnixConfig),
48}
49
50impl SocketConfig {
51    pub fn new_tcp(tcp_config: tcp::TcpConfig) -> Self {
52        tcp_config.into()
53    }
54
55    pub fn make_basic_tcp_config(addr: std::net::SocketAddr) -> Self {
56        tcp::TcpConfig::from_address(addr.into()).into()
57    }
58
59    fn decoding(&self) -> DeserializerConfig {
60        match &self.mode {
61            Mode::Tcp(config) => config.decoding().clone(),
62            Mode::Udp(config) => config.decoding().clone(),
63            #[cfg(unix)]
64            Mode::UnixDatagram(config) => config.decoding().clone(),
65            #[cfg(unix)]
66            Mode::UnixStream(config) => config.decoding().clone(),
67        }
68    }
69
70    fn log_namespace(&self, global_log_namespace: LogNamespace) -> LogNamespace {
71        match &self.mode {
72            Mode::Tcp(config) => global_log_namespace.merge(config.log_namespace),
73            Mode::Udp(config) => global_log_namespace.merge(config.log_namespace),
74            #[cfg(unix)]
75            Mode::UnixDatagram(config) => global_log_namespace.merge(config.log_namespace),
76            #[cfg(unix)]
77            Mode::UnixStream(config) => global_log_namespace.merge(config.log_namespace),
78        }
79    }
80}
81
82impl From<tcp::TcpConfig> for SocketConfig {
83    fn from(config: tcp::TcpConfig) -> Self {
84        SocketConfig {
85            mode: Mode::Tcp(config),
86        }
87    }
88}
89
90impl From<udp::UdpConfig> for SocketConfig {
91    fn from(config: udp::UdpConfig) -> Self {
92        SocketConfig {
93            mode: Mode::Udp(config),
94        }
95    }
96}
97
98impl GenerateConfig for SocketConfig {
99    fn generate_config() -> toml::Value {
100        toml::from_str(
101            r#"mode = "tcp"
102            address = "0.0.0.0:9000""#,
103        )
104        .unwrap()
105    }
106}
107
108#[async_trait::async_trait]
109#[typetag::serde(name = "socket")]
110impl SourceConfig for SocketConfig {
111    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
112        match self.mode.clone() {
113            Mode::Tcp(config) => {
114                let log_namespace = cx.log_namespace(config.log_namespace);
115
116                let decoding = config.decoding().clone();
117                let decoder = DecodingConfig::new(
118                    config
119                        .framing
120                        .clone()
121                        .unwrap_or_else(|| decoding.default_stream_framing()),
122                    decoding,
123                    log_namespace,
124                )
125                .build()?;
126
127                let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
128                let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
129                let tls_client_metadata_key = config
130                    .tls()
131                    .as_ref()
132                    .and_then(|tls| tls.client_metadata_key.clone())
133                    .and_then(|k| k.path);
134                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
135                tcp.run(
136                    config.address(),
137                    config.keepalive(),
138                    config.shutdown_timeout_secs(),
139                    tls,
140                    tls_client_metadata_key,
141                    config.receive_buffer_bytes(),
142                    config.max_connection_duration_secs(),
143                    cx,
144                    false.into(),
145                    config.connection_limit,
146                    config.permit_origin.map(Into::into),
147                    SocketConfig::NAME,
148                    log_namespace,
149                )
150            }
151            Mode::Udp(config) => {
152                let log_namespace = cx.log_namespace(config.log_namespace);
153                let decoding = config.decoding().clone();
154                let framing = config
155                    .framing()
156                    .clone()
157                    .unwrap_or_else(|| decoding.default_message_based_framing());
158                let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
159                Ok(udp::udp(
160                    config,
161                    decoder,
162                    cx.shutdown,
163                    cx.out,
164                    log_namespace,
165                ))
166            }
167            #[cfg(unix)]
168            Mode::UnixDatagram(config) => {
169                let log_namespace = cx.log_namespace(config.log_namespace);
170                let decoding = config.decoding.clone();
171                let framing = config
172                    .framing
173                    .clone()
174                    .unwrap_or_else(|| decoding.default_message_based_framing());
175                let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
176
177                unix::unix_datagram(config, decoder, cx.shutdown, cx.out, log_namespace)
178            }
179            #[cfg(unix)]
180            Mode::UnixStream(config) => {
181                let log_namespace = cx.log_namespace(config.log_namespace);
182
183                let decoding = config.decoding().clone();
184                let decoder = DecodingConfig::new(
185                    config
186                        .framing
187                        .clone()
188                        .unwrap_or_else(|| decoding.default_stream_framing()),
189                    decoding,
190                    log_namespace,
191                )
192                .build()?;
193
194                unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
195            }
196        }
197    }
198
199    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
200        let log_namespace = self.log_namespace(global_log_namespace);
201
202        let schema_definition = self
203            .decoding()
204            .schema_definition(log_namespace)
205            .with_standard_vector_source_metadata();
206
207        let schema_definition = match &self.mode {
208            Mode::Tcp(config) => {
209                let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
210
211                let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
212
213                let tls_client_metadata_path = config
214                    .tls()
215                    .as_ref()
216                    .and_then(|tls| tls.client_metadata_key.as_ref())
217                    .and_then(|k| k.path.clone())
218                    .map(LegacyKey::Overwrite);
219
220                schema_definition
221                    .with_source_metadata(
222                        Self::NAME,
223                        legacy_host_key,
224                        &owned_value_path!("host"),
225                        Kind::bytes(),
226                        Some("host"),
227                    )
228                    .with_source_metadata(
229                        Self::NAME,
230                        legacy_port_key,
231                        &owned_value_path!("port"),
232                        Kind::integer(),
233                        None,
234                    )
235                    .with_source_metadata(
236                        Self::NAME,
237                        tls_client_metadata_path,
238                        &owned_value_path!("tls_client_metadata"),
239                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
240                            .or_undefined(),
241                        None,
242                    )
243            }
244            Mode::Udp(config) => {
245                let legacy_host_key = config.host_key().path.map(LegacyKey::InsertIfEmpty);
246
247                let legacy_port_key = config.port_key().clone().path.map(LegacyKey::InsertIfEmpty);
248
249                schema_definition
250                    .with_source_metadata(
251                        Self::NAME,
252                        legacy_host_key,
253                        &owned_value_path!("host"),
254                        Kind::bytes(),
255                        None,
256                    )
257                    .with_source_metadata(
258                        Self::NAME,
259                        legacy_port_key,
260                        &owned_value_path!("port"),
261                        Kind::integer(),
262                        None,
263                    )
264            }
265            #[cfg(unix)]
266            Mode::UnixDatagram(config) => {
267                let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
268
269                schema_definition.with_source_metadata(
270                    Self::NAME,
271                    legacy_host_key,
272                    &owned_value_path!("host"),
273                    Kind::bytes(),
274                    None,
275                )
276            }
277            #[cfg(unix)]
278            Mode::UnixStream(config) => {
279                let legacy_host_key = config.host_key().clone().path.map(LegacyKey::InsertIfEmpty);
280
281                schema_definition.with_source_metadata(
282                    Self::NAME,
283                    legacy_host_key,
284                    &owned_value_path!("host"),
285                    Kind::bytes(),
286                    None,
287                )
288            }
289        };
290
291        vec![SourceOutput::new_maybe_logs(
292            self.decoding().output_type(),
293            schema_definition,
294        )]
295    }
296
297    fn resources(&self) -> Vec<Resource> {
298        match self.mode.clone() {
299            Mode::Tcp(tcp) => vec![tcp.address().as_tcp_resource()],
300            Mode::Udp(udp) => vec![udp.address().as_udp_resource()],
301            #[cfg(unix)]
302            Mode::UnixDatagram(_) => vec![],
303            #[cfg(unix)]
304            Mode::UnixStream(_) => vec![],
305        }
306    }
307
308    fn can_acknowledge(&self) -> bool {
309        false
310    }
311}
312
313pub(crate) fn default_host_key() -> OptionalValuePath {
314    log_schema().host_key().cloned().into()
315}
316
317#[cfg(test)]
318mod test {
319    use approx::assert_relative_eq;
320    use std::{
321        collections::HashMap,
322        net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
323        sync::{
324            atomic::{AtomicBool, Ordering},
325            Arc,
326        },
327        thread,
328    };
329
330    use bytes::{BufMut, Bytes, BytesMut};
331    use futures::{stream, StreamExt};
332    use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
333    use serde_json::json;
334    use tokio::io::AsyncReadExt;
335    use tokio::net::TcpStream;
336    use tokio::{
337        task::JoinHandle,
338        time::{timeout, Duration, Instant},
339    };
340    #[cfg(unix)]
341    use vector_lib::codecs::{
342        decoding::CharacterDelimitedDecoderOptions, CharacterDelimitedDecoderConfig,
343    };
344    use vector_lib::codecs::{GelfDeserializerConfig, NewlineDelimitedDecoderConfig};
345    use vector_lib::event::EventContainer;
346    use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
347    use vrl::value::ObjectMap;
348    use vrl::{btreemap, value};
349
350    #[cfg(unix)]
351    use {
352        super::{unix::UnixConfig, Mode},
353        crate::sources::util::unix::UNNAMED_SOCKET_HOST,
354        crate::test_util::wait_for,
355        futures::{SinkExt, Stream},
356        std::future::ready,
357        std::os::unix::fs::PermissionsExt,
358        std::path::PathBuf,
359        tokio::{
360            io::AsyncWriteExt,
361            net::{UnixDatagram, UnixStream},
362            task::yield_now,
363        },
364        tokio_util::codec::{FramedWrite, LinesCodec},
365    };
366
367    use super::{tcp::TcpConfig, udp::UdpConfig, SocketConfig};
368    use crate::{
369        config::{log_schema, ComponentKey, GlobalOptions, SourceConfig, SourceContext},
370        event::{Event, LogEvent},
371        shutdown::{ShutdownSignal, SourceShutdownCoordinator},
372        sinks::util::tcp::TcpSinkConfig,
373        sources::util::net::SocketListenAddr,
374        test_util::{
375            collect_n, collect_n_limited,
376            components::{
377                assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
378                SOCKET_PUSH_SOURCE_TAGS,
379            },
380            next_addr, next_addr_any, random_string, send_lines, send_lines_tls, wait_for_tcp,
381        },
382        tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
383        SourceSender,
384    };
385
386    fn get_gelf_payload(message: &str) -> String {
387        serde_json::to_string(&json!({
388            "version": "1.1",
389            "host": "example.org",
390            "short_message": message,
391            "timestamp": 1234567890.123,
392            "level": 6,
393            "_foo": "bar",
394        }))
395        .unwrap()
396    }
397
398    fn create_gelf_chunk(
399        message_id: u64,
400        sequence_number: u8,
401        total_chunks: u8,
402        payload: &[u8],
403    ) -> Bytes {
404        const GELF_MAGIC: [u8; 2] = [0x1e, 0x0f];
405        let mut chunk = BytesMut::new();
406        chunk.put_slice(&GELF_MAGIC);
407        chunk.put_u64(message_id);
408        chunk.put_u8(sequence_number);
409        chunk.put_u8(total_chunks);
410        chunk.put(payload);
411        chunk.freeze()
412    }
413
414    fn get_gelf_chunks(short_message: &str, max_size: usize, rng: &mut SmallRng) -> Vec<Bytes> {
415        let message_id = rand::random();
416        let payload = get_gelf_payload(short_message);
417        let payload_chunks = payload.as_bytes().chunks(max_size).collect::<Vec<_>>();
418        let total_chunks = payload_chunks.len();
419        assert!(total_chunks <= 128, "too many gelf chunks");
420
421        let mut chunks = payload_chunks
422            .into_iter()
423            .enumerate()
424            .map(|(i, payload_chunk)| {
425                create_gelf_chunk(message_id, i as u8, total_chunks as u8, payload_chunk)
426            })
427            .collect::<Vec<_>>();
428        // Shuffle the chunks to simulate out-of-order delivery
429        chunks.shuffle(rng);
430        chunks
431    }
432
433    #[test]
434    fn generate_config() {
435        crate::test_util::test_generate_config::<SocketConfig>();
436    }
437
438    //////// TCP TESTS ////////
439    #[tokio::test]
440    async fn tcp_it_includes_host() {
441        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
442            let (tx, mut rx) = SourceSender::new_test();
443            let addr = next_addr();
444
445            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
446                .build(SourceContext::new_test(tx, None))
447                .await
448                .unwrap();
449            tokio::spawn(server);
450
451            wait_for_tcp(addr).await;
452            let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
453                .await
454                .unwrap();
455
456            let event = rx.next().await.unwrap();
457
458            assert_eq!(event.as_log()["host"], addr.ip().to_string().into());
459            assert_eq!(event.as_log()["port"], addr.port().into());
460        })
461        .await;
462    }
463
464    #[tokio::test]
465    async fn tcp_it_includes_vector_namespaced_fields() {
466        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
467            let (tx, mut rx) = SourceSender::new_test();
468            let addr = next_addr();
469            let mut conf = TcpConfig::from_address(addr.into());
470            conf.set_log_namespace(Some(true));
471
472            let server = SocketConfig::from(conf)
473                .build(SourceContext::new_test(tx, None))
474                .await
475                .unwrap();
476            tokio::spawn(server);
477
478            wait_for_tcp(addr).await;
479            let addr = send_lines(addr, vec!["test".to_owned()].into_iter())
480                .await
481                .unwrap();
482
483            let event = rx.next().await.unwrap();
484            let log = event.as_log();
485            let event_meta = log.metadata().value();
486
487            assert_eq!(log.value(), &"test".into());
488            assert_eq!(
489                event_meta.get(path!("vector", "source_type")).unwrap(),
490                &value!(SocketConfig::NAME)
491            );
492            assert_eq!(
493                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
494                &value!(addr.ip().to_string())
495            );
496            assert_eq!(
497                event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
498                &value!(addr.port())
499            );
500        })
501        .await;
502    }
503
504    #[tokio::test]
505    async fn tcp_splits_on_newline() {
506        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
507            let (tx, rx) = SourceSender::new_test();
508            let addr = next_addr();
509
510            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
511                .build(SourceContext::new_test(tx, None))
512                .await
513                .unwrap();
514            tokio::spawn(server);
515
516            wait_for_tcp(addr).await;
517            send_lines(addr, vec!["foo\nbar".to_owned()].into_iter())
518                .await
519                .unwrap();
520
521            let events = collect_n(rx, 2).await;
522
523            assert_eq!(events.len(), 2);
524            assert_eq!(
525                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
526                "foo".into()
527            );
528            assert_eq!(
529                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
530                "bar".into()
531            );
532        })
533        .await;
534    }
535
536    #[tokio::test]
537    async fn tcp_it_includes_source_type() {
538        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
539            let (tx, mut rx) = SourceSender::new_test();
540            let addr = next_addr();
541
542            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
543                .build(SourceContext::new_test(tx, None))
544                .await
545                .unwrap();
546            tokio::spawn(server);
547
548            wait_for_tcp(addr).await;
549            send_lines(addr, vec!["test".to_owned()].into_iter())
550                .await
551                .unwrap();
552
553            let event = rx.next().await.unwrap();
554            assert_eq!(
555                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
556                "socket".into()
557            );
558        })
559        .await;
560    }
561
562    #[tokio::test]
563    async fn tcp_continue_after_long_line() {
564        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
565            let (tx, mut rx) = SourceSender::new_test();
566            let addr = next_addr();
567
568            let mut config = TcpConfig::from_address(addr.into());
569            config.set_framing(Some(
570                NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
571            ));
572
573            let server = SocketConfig::from(config)
574                .build(SourceContext::new_test(tx, None))
575                .await
576                .unwrap();
577            tokio::spawn(server);
578
579            let lines = vec![
580                "short".to_owned(),
581                "this is too long".to_owned(),
582                "more short".to_owned(),
583            ];
584
585            wait_for_tcp(addr).await;
586            send_lines(addr, lines.into_iter()).await.unwrap();
587
588            let event = rx.next().await.unwrap();
589            assert_eq!(
590                event.as_log()[log_schema().message_key().unwrap().to_string()],
591                "short".into()
592            );
593
594            let event = rx.next().await.unwrap();
595            assert_eq!(
596                event.as_log()[log_schema().message_key().unwrap().to_string()],
597                "more short".into()
598            );
599        })
600        .await;
601    }
602
603    #[tokio::test]
604    async fn tcp_with_tls() {
605        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
606            let (tx, mut rx) = SourceSender::new_test();
607            let addr = next_addr();
608
609            let mut config = TcpConfig::from_address(addr.into());
610            config.set_tls(Some(TlsSourceConfig {
611                tls_config: TlsEnableableConfig {
612                    enabled: Some(true),
613                    options: TlsConfig {
614                        verify_certificate: Some(true),
615                        crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
616                        key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
617                        ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
618                        ..Default::default()
619                    },
620                },
621                client_metadata_key: Some(OptionalValuePath::from(owned_value_path!("tls_peer"))),
622            }));
623
624            let server = SocketConfig::from(config)
625                .build(SourceContext::new_test(tx, None))
626                .await
627                .unwrap();
628            tokio::spawn(server);
629
630            let lines = vec!["one line".to_owned(), "another line".to_owned()];
631
632            wait_for_tcp(addr).await;
633            send_lines_tls(
634                addr,
635                "localhost".into(),
636                lines.into_iter(),
637                std::path::Path::new(tls::TEST_PEM_CA_PATH),
638                std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
639                std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
640            )
641            .await
642            .unwrap();
643
644            let event = rx.next().await.unwrap();
645            assert_eq!(
646                event.as_log()[log_schema().message_key().unwrap().to_string()],
647                "one line".into()
648            );
649
650            let tls_meta: ObjectMap = btreemap!(
651                "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
652            );
653
654            assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
655
656            let event = rx.next().await.unwrap();
657            assert_eq!(
658                event.as_log()[log_schema().message_key().unwrap().to_string()],
659                "another line".into()
660            );
661
662            assert_eq!(event.as_log()["tls_peer"], tls_meta.clone().into(),);
663        })
664        .await;
665    }
666
667    #[tokio::test]
668    async fn tcp_with_tls_vector_namespace() {
669        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
670            let (tx, mut rx) = SourceSender::new_test();
671            let addr = next_addr();
672
673            let mut config = TcpConfig::from_address(addr.into());
674            config.set_tls(Some(TlsSourceConfig {
675                tls_config: TlsEnableableConfig {
676                    enabled: Some(true),
677                    options: TlsConfig {
678                        verify_certificate: Some(true),
679                        crt_file: Some(tls::TEST_PEM_CRT_PATH.into()),
680                        key_file: Some(tls::TEST_PEM_KEY_PATH.into()),
681                        ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
682                        ..Default::default()
683                    },
684                },
685                client_metadata_key: None,
686            }));
687            config.log_namespace = Some(true);
688
689            let server = SocketConfig::from(config)
690                .build(SourceContext::new_test(tx, None))
691                .await
692                .unwrap();
693            tokio::spawn(server);
694
695            let lines = vec!["one line".to_owned(), "another line".to_owned()];
696
697            wait_for_tcp(addr).await;
698            send_lines_tls(
699                addr,
700                "localhost".into(),
701                lines.into_iter(),
702                std::path::Path::new(tls::TEST_PEM_CA_PATH),
703                std::path::Path::new(tls::TEST_PEM_CLIENT_CRT_PATH),
704                std::path::Path::new(tls::TEST_PEM_CLIENT_KEY_PATH),
705            )
706            .await
707            .unwrap();
708
709            let event = rx.next().await.unwrap();
710            let log = event.as_log();
711            let event_meta = log.metadata().value();
712
713            assert_eq!(log.value(), &"one line".into());
714
715            let tls_meta: ObjectMap = btreemap!(
716                "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US"
717            );
718
719            assert_eq!(
720                event_meta
721                    .get(path!(SocketConfig::NAME, "tls_client_metadata"))
722                    .unwrap(),
723                &value!(tls_meta.clone())
724            );
725
726            let event = rx.next().await.unwrap();
727            let log = event.as_log();
728            let event_meta = log.metadata().value();
729
730            assert_eq!(log.value(), &"another line".into());
731
732            assert_eq!(
733                event_meta
734                    .get(path!(SocketConfig::NAME, "tls_client_metadata"))
735                    .unwrap(),
736                &value!(tls_meta.clone())
737            );
738        })
739        .await;
740    }
741
742    #[tokio::test]
743    async fn tcp_shutdown_simple() {
744        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
745            let source_id = ComponentKey::from("tcp_shutdown_simple");
746            let (tx, mut rx) = SourceSender::new_test();
747            let addr = next_addr();
748            let (cx, mut shutdown) = SourceContext::new_shutdown(&source_id, tx);
749
750            // Start TCP Source
751            let server = SocketConfig::from(TcpConfig::from_address(addr.into()))
752                .build(cx)
753                .await
754                .unwrap();
755            let source_handle = tokio::spawn(server);
756
757            // Send data to Source.
758            wait_for_tcp(addr).await;
759            send_lines(addr, vec!["test".to_owned()].into_iter())
760                .await
761                .unwrap();
762
763            let event = rx.next().await.unwrap();
764            assert_eq!(
765                event.as_log()[log_schema().message_key().unwrap().to_string()],
766                "test".into()
767            );
768
769            // Now signal to the Source to shut down.
770            let deadline = Instant::now() + Duration::from_secs(10);
771            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
772            let shutdown_success = shutdown_complete.await;
773            assert!(shutdown_success);
774
775            // Ensure source actually shut down successfully.
776            _ = source_handle.await.unwrap();
777        })
778        .await;
779    }
780
781    // Intentially not using assert_source_compliance here because this is a round-trip test which
782    // means source and sink will both emit `EventsSent` , triggering multi-emission check.
783    #[tokio::test]
784    async fn tcp_shutdown_infinite_stream() {
785        // We create our TCP source with a larger-than-normal send buffer, which helps ensure that
786        // the source doesn't block on sending the events downstream, otherwise if it was blocked on
787        // doing so, it wouldn't be able to wake up and loop to see that it had been signalled to
788        // shutdown.
789        let addr = next_addr();
790
791        let (source_tx, source_rx) = SourceSender::new_test_sender_with_buffer(10_000);
792        let source_key = ComponentKey::from("tcp_shutdown_infinite_stream");
793        let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx);
794
795        let mut source_config = TcpConfig::from_address(addr.into());
796        source_config.set_shutdown_timeout_secs(1);
797        let source_task = SocketConfig::from(source_config)
798            .build(source_cx)
799            .await
800            .unwrap();
801
802        // Spawn the source task and wait until we're sure it's listening:
803        let source_handle = tokio::spawn(source_task);
804        wait_for_tcp(addr).await;
805
806        // Now we create a TCP _sink_ which we'll feed with an infinite stream of events to ship to
807        // our TCP source.  This will ensure that our TCP source is fully-loaded as we try to shut
808        // it down, exercising the logic we have to ensure timely shutdown even under load:
809        let message = random_string(512);
810        let message_bytes = Bytes::from(message.clone());
811
812        #[derive(Clone, Debug)]
813        struct Serializer {
814            bytes: Bytes,
815        }
816        impl tokio_util::codec::Encoder<Event> for Serializer {
817            type Error = vector_lib::codecs::encoding::Error;
818
819            fn encode(&mut self, _: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
820                buffer.put(self.bytes.as_ref());
821                buffer.put_u8(b'\n');
822                Ok(())
823            }
824        }
825        let sink_config = TcpSinkConfig::from_address(format!("localhost:{}", addr.port()));
826        let encoder = Serializer {
827            bytes: message_bytes,
828        };
829        let (sink, _healthcheck) = sink_config.build(Default::default(), encoder).unwrap();
830
831        tokio::spawn(async move {
832            let input = stream::repeat_with(|| LogEvent::default().into()).boxed();
833            sink.run(input).await.unwrap();
834        });
835
836        // Now with our sink running, feeding events to the source, collect 100 event arrays from
837        // the source and make sure each event within them matches the single message we repeatedly
838        // sent via the sink:
839        let events = collect_n_limited(source_rx, 100)
840            .await
841            .into_iter()
842            .collect::<Vec<_>>();
843        assert_eq!(100, events.len());
844
845        let message_key = log_schema().message_key().unwrap().to_string();
846        let expected_message = message.clone().into();
847        for event in events.into_iter().flat_map(EventContainer::into_events) {
848            assert_eq!(event.as_log()[message_key.as_str()], expected_message);
849        }
850
851        // Now trigger shutdown on the source and ensure that it shuts down before or at the
852        // deadline, and make sure the source task actually finished as well:
853        let shutdown_timeout_limit = Duration::from_secs(10);
854        let deadline = Instant::now() + shutdown_timeout_limit;
855        let shutdown_complete = shutdown.shutdown_source(&source_key, deadline);
856
857        let shutdown_result = timeout(shutdown_timeout_limit, shutdown_complete).await;
858        assert_eq!(shutdown_result, Ok(true));
859
860        let source_result = source_handle.await.expect("source task should not panic");
861        assert_eq!(source_result, Ok(()));
862    }
863
864    #[tokio::test]
865    async fn tcp_connection_close_after_max_duration() {
866        let (tx, _) = SourceSender::new_test();
867        let addr = next_addr();
868
869        let mut source_config = TcpConfig::from_address(addr.into());
870        source_config.set_max_connection_duration_secs(Some(1));
871        let source_task = SocketConfig::from(source_config)
872            .build(SourceContext::new_test(tx, None))
873            .await
874            .unwrap();
875
876        // Spawn the source task and wait until we're sure it's listening:
877        drop(tokio::spawn(source_task));
878        wait_for_tcp(addr).await;
879
880        let mut stream: TcpStream = TcpStream::connect(addr)
881            .await
882            .expect("stream should be able to connect");
883        let start = Instant::now();
884
885        let timeout = tokio::time::sleep(Duration::from_millis(1200));
886        let mut buffer = [0u8; 10];
887
888        tokio::select! {
889             _ = timeout => {
890                 panic!("timed out waiting for stream to close")
891             },
892             read_result = stream.read(&mut buffer) => {
893                 match read_result {
894                    // read resulting with 0 bytes -> the connection was closed
895                    Ok(0) => assert_relative_eq!(start.elapsed().as_secs_f64(), 1.0, epsilon = 0.3),
896                    Ok(_) => panic!("unexpectedly read data from stream"),
897                    Err(e) => panic!("{e:}")
898                 }
899             }
900        }
901    }
902
903    //////// UDP TESTS ////////
904    fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
905        send_lines_udp_from(next_addr(), to, lines)
906    }
907
908    fn send_lines_udp_from(
909        from: SocketAddr,
910        to: SocketAddr,
911        lines: impl IntoIterator<Item = String>,
912    ) -> SocketAddr {
913        send_packets_udp_from(from, to, lines.into_iter().map(|line| line.into()))
914    }
915
916    fn send_packets_udp(to: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
917        send_packets_udp_from(next_addr(), to, packets)
918    }
919
920    fn send_packets_udp_from(
921        from: SocketAddr,
922        to: SocketAddr,
923        packets: impl IntoIterator<Item = Bytes>,
924    ) -> SocketAddr {
925        let socket = UdpSocket::bind(from)
926            .map_err(|error| panic!("{error:}"))
927            .ok()
928            .unwrap();
929
930        for packet in packets {
931            assert_eq!(
932                socket
933                    .send_to(&packet, to)
934                    .map_err(|error| panic!("{error:}"))
935                    .ok()
936                    .unwrap(),
937                packet.len()
938            );
939            // Space things out slightly to try to avoid dropped packets
940            thread::sleep(Duration::from_millis(1));
941        }
942
943        // Give packets some time to flow through
944        thread::sleep(Duration::from_millis(10));
945
946        // Done
947        from
948    }
949
950    async fn init_udp_with_shutdown(
951        sender: SourceSender,
952        source_id: &ComponentKey,
953        shutdown: &mut SourceShutdownCoordinator,
954    ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
955        let (shutdown_signal, _) = shutdown.register_source(source_id, false);
956        init_udp_inner(sender, source_id, shutdown_signal, None, false).await
957    }
958
959    async fn init_udp(sender: SourceSender, use_log_namespace: bool) -> SocketAddr {
960        let (addr, _handle) = init_udp_inner(
961            sender,
962            &ComponentKey::from("default"),
963            ShutdownSignal::noop(),
964            None,
965            use_log_namespace,
966        )
967        .await;
968        addr
969    }
970
971    async fn init_udp_with_config(sender: SourceSender, config: UdpConfig) -> SocketAddr {
972        let (addr, _handle) = init_udp_inner(
973            sender,
974            &ComponentKey::from("default"),
975            ShutdownSignal::noop(),
976            Some(config),
977            false,
978        )
979        .await;
980        addr
981    }
982
983    async fn init_udp_inner(
984        sender: SourceSender,
985        source_key: &ComponentKey,
986        shutdown_signal: ShutdownSignal,
987        config: Option<UdpConfig>,
988        use_vector_namespace: bool,
989    ) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
990        let (address, mut config) = match config {
991            Some(config) => match config.address() {
992                SocketListenAddr::SocketAddr(addr) => (addr, config),
993                _ => panic!("listen address should not be systemd FD offset in tests"),
994            },
995            None => {
996                let address = next_addr();
997                (address, UdpConfig::from_address(address.into()))
998            }
999        };
1000
1001        let config = if use_vector_namespace {
1002            config.set_log_namespace(Some(true));
1003            config
1004        } else {
1005            config
1006        };
1007
1008        let server = SocketConfig::from(config)
1009            .build(SourceContext {
1010                key: source_key.clone(),
1011                globals: GlobalOptions::default(),
1012                enrichment_tables: Default::default(),
1013                shutdown: shutdown_signal,
1014                out: sender,
1015                proxy: Default::default(),
1016                acknowledgements: false,
1017                schema: Default::default(),
1018                schema_definitions: HashMap::default(),
1019                extra_context: Default::default(),
1020            })
1021            .await
1022            .unwrap();
1023        let source_handle = tokio::spawn(server);
1024
1025        // Wait for UDP to start listening
1026        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1027
1028        (address, source_handle)
1029    }
1030
1031    #[tokio::test]
1032    async fn udp_message() {
1033        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1034            let (tx, rx) = SourceSender::new_test();
1035            let address = init_udp(tx, false).await;
1036
1037            send_lines_udp(address, vec!["test".to_string()]);
1038            let events = collect_n(rx, 1).await;
1039
1040            assert_eq!(
1041                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1042                "test".into()
1043            );
1044        })
1045        .await;
1046    }
1047
1048    #[tokio::test]
1049    async fn udp_message_preserves_newline() {
1050        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1051            let (tx, rx) = SourceSender::new_test();
1052            let address = init_udp(tx, false).await;
1053
1054            send_lines_udp(address, vec!["foo\nbar".to_string()]);
1055            let events = collect_n(rx, 1).await;
1056
1057            assert_eq!(
1058                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1059                "foo\nbar".into()
1060            );
1061        })
1062        .await;
1063    }
1064
1065    #[tokio::test]
1066    async fn udp_multiple_packets() {
1067        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1068            let (tx, rx) = SourceSender::new_test();
1069            let address = init_udp(tx, false).await;
1070
1071            send_lines_udp(address, vec!["test".to_string(), "test2".to_string()]);
1072            let events = collect_n(rx, 2).await;
1073
1074            assert_eq!(
1075                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1076                "test".into()
1077            );
1078            assert_eq!(
1079                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1080                "test2".into()
1081            );
1082        })
1083        .await;
1084    }
1085
1086    #[tokio::test]
1087    async fn udp_max_length() {
1088        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1089            let (tx, rx) = SourceSender::new_test();
1090            let address = next_addr();
1091            let mut config = UdpConfig::from_address(address.into());
1092            config.max_length = 11;
1093            let address = init_udp_with_config(tx, config).await;
1094
1095            send_lines_udp(
1096                address,
1097                vec![
1098                    "short line".to_string(),
1099                    "test with a long line".to_string(),
1100                    "a short un".to_string(),
1101                ],
1102            );
1103
1104            let events = collect_n(rx, 2).await;
1105            assert_eq!(
1106                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1107                "short line".into()
1108            );
1109            assert_eq!(
1110                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1111                "a short un".into()
1112            );
1113        })
1114        .await;
1115    }
1116
1117    #[cfg(unix)]
1118    #[tokio::test]
1119    /// This test only works on Unix.
1120    /// Unix truncates at max_length giving us the bytes to get the first n delimited messages.
1121    /// Windows will drop the entire packet if we exceed the max_length so we are unable to
1122    /// extract anything.
1123    async fn udp_max_length_delimited() {
1124        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1125            let (tx, rx) = SourceSender::new_test();
1126            let address = next_addr();
1127            let mut config = UdpConfig::from_address(address.into());
1128            config.max_length = 10;
1129            config.framing = Some(
1130                CharacterDelimitedDecoderConfig {
1131                    character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
1132                }
1133                .into(),
1134            );
1135            let address = init_udp_with_config(tx, config).await;
1136
1137            send_lines_udp(
1138                address,
1139                vec!["test with, long line".to_string(), "short one".to_string()],
1140            );
1141
1142            let events = collect_n(rx, 2).await;
1143            assert_eq!(
1144                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1145                "test with".into()
1146            );
1147            assert_eq!(
1148                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1149                "short one".into()
1150            );
1151        })
1152        .await;
1153    }
1154
1155    #[tokio::test]
1156    async fn udp_decodes_chunked_gelf_messages() {
1157        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1158            let (tx, rx) = SourceSender::new_test();
1159            let address = next_addr();
1160            let mut config = UdpConfig::from_address(address.into());
1161            config.decoding = GelfDeserializerConfig::default().into();
1162            let address = init_udp_with_config(tx, config).await;
1163            let seed = 42;
1164            let mut rng = SmallRng::seed_from_u64(seed);
1165            let max_size = 300;
1166            let big_message = "This is a very large message".repeat(500);
1167            let another_big_message = "This is another very large message".repeat(500);
1168            let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1169            let mut another_chunks =
1170                get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1171            chunks.append(&mut another_chunks);
1172            chunks.shuffle(&mut rng);
1173
1174            send_packets_udp(address, chunks);
1175
1176            let events = collect_n(rx, 2).await;
1177            assert_eq!(
1178                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1179                big_message.into()
1180            );
1181            assert_eq!(
1182                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1183                another_big_message.into()
1184            );
1185        })
1186        .await;
1187    }
1188
1189    #[tokio::test]
1190    async fn udp_it_includes_host() {
1191        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1192            let (tx, rx) = SourceSender::new_test();
1193            let address = init_udp(tx, false).await;
1194
1195            let from = send_lines_udp(address, vec!["test".to_string()]);
1196            let events = collect_n(rx, 1).await;
1197
1198            assert_eq!(events[0].as_log()["host"], from.ip().to_string().into());
1199            assert_eq!(events[0].as_log()["port"], from.port().into());
1200        })
1201        .await;
1202    }
1203
1204    #[tokio::test]
1205    async fn udp_it_includes_vector_namespaced_fields() {
1206        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1207            let (tx, rx) = SourceSender::new_test();
1208            let address = init_udp(tx, true).await;
1209
1210            let from = send_lines_udp(address, vec!["test".to_string()]);
1211            let events = collect_n(rx, 1).await;
1212            let log = events[0].as_log();
1213            let event_meta = log.metadata().value();
1214
1215            assert_eq!(log.value(), &"test".into());
1216            assert_eq!(
1217                event_meta.get(path!("vector", "source_type")).unwrap(),
1218                &value!(SocketConfig::NAME)
1219            );
1220            assert_eq!(
1221                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1222                &value!(from.ip().to_string())
1223            );
1224            assert_eq!(
1225                event_meta.get(path!(SocketConfig::NAME, "port")).unwrap(),
1226                &value!(from.port())
1227            );
1228        })
1229        .await;
1230    }
1231
1232    #[tokio::test]
1233    async fn udp_it_includes_source_type() {
1234        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1235            let (tx, rx) = SourceSender::new_test();
1236            let address = init_udp(tx, false).await;
1237
1238            _ = send_lines_udp(address, vec!["test".to_string()]);
1239            let events = collect_n(rx, 1).await;
1240
1241            assert_eq!(
1242                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1243                "socket".into()
1244            );
1245        })
1246        .await;
1247    }
1248
1249    #[tokio::test]
1250    async fn udp_shutdown_simple() {
1251        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1252            let (tx, rx) = SourceSender::new_test();
1253            let source_id = ComponentKey::from("udp_shutdown_simple");
1254
1255            let mut shutdown = SourceShutdownCoordinator::default();
1256            let (address, source_handle) =
1257                init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1258
1259            send_lines_udp(address, vec!["test".to_string()]);
1260            let events = collect_n(rx, 1).await;
1261
1262            assert_eq!(
1263                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1264                "test".into()
1265            );
1266
1267            // Now signal to the Source to shut down.
1268            let deadline = Instant::now() + Duration::from_secs(10);
1269            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1270            let shutdown_success = shutdown_complete.await;
1271            assert!(shutdown_success);
1272
1273            // Ensure source actually shut down successfully.
1274            _ = source_handle.await.unwrap();
1275        })
1276        .await;
1277    }
1278
1279    #[tokio::test]
1280    async fn udp_shutdown_infinite_stream() {
1281        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1282            let (tx, rx) = SourceSender::new_test();
1283            let source_id = ComponentKey::from("udp_shutdown_infinite_stream");
1284
1285            let mut shutdown = SourceShutdownCoordinator::default();
1286            let (address, source_handle) =
1287                init_udp_with_shutdown(tx, &source_id, &mut shutdown).await;
1288
1289            // Stream that keeps sending lines to the UDP source forever.
1290            let run_pump_atomic_sender = Arc::new(AtomicBool::new(true));
1291            let run_pump_atomic_receiver = Arc::clone(&run_pump_atomic_sender);
1292            let pump_handle = std::thread::spawn(move || {
1293                send_lines_udp(
1294                    address,
1295                    std::iter::repeat("test".to_string())
1296                        .take_while(move |_| run_pump_atomic_receiver.load(Ordering::Relaxed)),
1297                );
1298            });
1299
1300            // Important that 'rx' doesn't get dropped until the pump has finished sending items to it.
1301            let events = collect_n(rx, 100).await;
1302            assert_eq!(100, events.len());
1303            for event in events {
1304                assert_eq!(
1305                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1306                    "test".into()
1307                );
1308            }
1309
1310            let deadline = Instant::now() + Duration::from_secs(10);
1311            let shutdown_complete = shutdown.shutdown_source(&source_id, deadline);
1312            let shutdown_success = shutdown_complete.await;
1313            assert!(shutdown_success);
1314
1315            // Ensure that the source has actually shut down.
1316            _ = source_handle.await.unwrap();
1317
1318            // Stop the pump from sending lines forever.
1319            run_pump_atomic_sender.store(false, Ordering::Relaxed);
1320            assert!(pump_handle.join().is_ok());
1321        })
1322        .await;
1323    }
1324
1325    #[tokio::test]
1326    async fn multicast_udp_message() {
1327        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1328            let (tx, mut rx) = SourceSender::new_test();
1329            // The socket address must be `IPADDR_ANY` (0.0.0.0) in order to receive multicast packets
1330            let socket_address = next_addr_any();
1331            let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1332            let multicast_socket_address =
1333                SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1334            let mut config = UdpConfig::from_address(socket_address.into());
1335            config.multicast_groups = vec![multicast_ip_address];
1336            init_udp_with_config(tx, config).await;
1337
1338            // We must send packets to the same interface the `socket_address` is bound to
1339            // in order to receive the multicast packets the `from` socket sends.
1340            // To do so, we use the `IPADDR_ANY` address
1341            let from = next_addr_any();
1342            send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
1343
1344            let event = rx.next().await.expect("must receive an event");
1345            assert_eq!(
1346                event.as_log()[log_schema().message_key().unwrap().to_string()],
1347                "test".into()
1348            );
1349        })
1350        .await;
1351    }
1352
1353    #[tokio::test]
1354    async fn multiple_multicast_addresses_udp_message() {
1355        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1356            let (tx, mut rx) = SourceSender::new_test();
1357            let socket_address = next_addr_any();
1358            let multicast_ip_addresses = (2..12)
1359                .map(|i| format!("224.0.0.{i}").parse().unwrap())
1360                .collect::<Vec<Ipv4Addr>>();
1361            let multicast_ip_socket_addresses = multicast_ip_addresses
1362                .iter()
1363                .map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
1364                .collect::<Vec<SocketAddr>>();
1365            let mut config = UdpConfig::from_address(socket_address.into());
1366            config.multicast_groups = multicast_ip_addresses;
1367            init_udp_with_config(tx, config).await;
1368
1369            let from = next_addr_any();
1370            for multicast_ip_socket_address in multicast_ip_socket_addresses {
1371                send_lines_udp_from(
1372                    from,
1373                    multicast_ip_socket_address,
1374                    [multicast_ip_socket_address.to_string()],
1375                );
1376
1377                let event = rx.next().await.expect("must receive an event");
1378                assert_eq!(
1379                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1380                    multicast_ip_socket_address.to_string().into()
1381                );
1382            }
1383        })
1384        .await;
1385    }
1386
1387    #[tokio::test]
1388    async fn multicast_and_unicast_udp_message() {
1389        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1390            let (tx, mut rx) = SourceSender::new_test();
1391            let socket_address = next_addr_any();
1392            let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
1393            let multicast_socket_address =
1394                SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
1395            let mut config = UdpConfig::from_address(socket_address.into());
1396            config.multicast_groups = vec![multicast_ip_address];
1397            init_udp_with_config(tx, config).await;
1398
1399            let from = next_addr_any();
1400            // Send packet to multicast address
1401            send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
1402            let event = rx.next().await.expect("must receive an event");
1403            assert_eq!(
1404                event.as_log()[log_schema().message_key().unwrap().to_string()],
1405                "test".into()
1406            );
1407
1408            // Windows does not support connecting to `0.0.0.0`,
1409            // therefore we connect to `127.0.0.1` instead (the socket is listening at `0.0.0.0`)
1410            let to = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), socket_address.port());
1411            // Send packet to unicast address
1412            send_lines_udp_from(from, to, ["test".to_string()]);
1413            let event = rx.next().await.expect("must receive an event");
1414            assert_eq!(
1415                event.as_log()[log_schema().message_key().unwrap().to_string()],
1416                "test".into()
1417            );
1418        })
1419        .await;
1420    }
1421
1422    #[tokio::test]
1423    async fn udp_invalid_multicast_group() {
1424        assert_source_error(&COMPONENT_ERROR_TAGS, async {
1425            let (tx, _rx) = SourceSender::new_test();
1426            let socket_address = next_addr_any();
1427            let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
1428            let mut config = UdpConfig::from_address(socket_address.into());
1429            config.multicast_groups = vec![invalid_multicast_ip_address];
1430            init_udp_with_config(tx, config).await;
1431        })
1432        .await;
1433    }
1434
1435    ////////////// UNIX TEST LIBS //////////////
1436
1437    #[cfg(unix)]
1438    async fn init_unix(sender: SourceSender, stream: bool, use_vector_namespace: bool) -> PathBuf {
1439        init_unix_inner(sender, stream, use_vector_namespace, None).await
1440    }
1441
1442    #[cfg(unix)]
1443    async fn init_unix_with_config(
1444        sender: SourceSender,
1445        stream: bool,
1446        use_vector_namespace: bool,
1447        config: UnixConfig,
1448    ) -> PathBuf {
1449        init_unix_inner(sender, stream, use_vector_namespace, Some(config)).await
1450    }
1451
1452    #[cfg(unix)]
1453    async fn init_unix_inner(
1454        sender: SourceSender,
1455        stream: bool,
1456        use_vector_namespace: bool,
1457        config: Option<UnixConfig>,
1458    ) -> PathBuf {
1459        let mut config = config.unwrap_or_else(|| {
1460            UnixConfig::new(tempfile::tempdir().unwrap().keep().join("unix_test"))
1461        });
1462
1463        let in_path = config.path.clone();
1464
1465        if use_vector_namespace {
1466            config.log_namespace = Some(true);
1467        }
1468
1469        let mode = if stream {
1470            Mode::UnixStream(config)
1471        } else {
1472            Mode::UnixDatagram(config)
1473        };
1474
1475        let server = SocketConfig { mode }
1476            .build(SourceContext::new_test(sender, None))
1477            .await
1478            .unwrap();
1479        tokio::spawn(server);
1480
1481        // Wait for server to accept traffic
1482        while if stream {
1483            std::os::unix::net::UnixStream::connect(&in_path).is_err()
1484        } else {
1485            let socket = std::os::unix::net::UnixDatagram::unbound().unwrap();
1486            socket.connect(&in_path).is_err()
1487        } {
1488            yield_now().await;
1489        }
1490
1491        in_path
1492    }
1493
1494    #[cfg(unix)]
1495    async fn unix_send_lines(stream: bool, path: PathBuf, lines: &[&str]) {
1496        match stream {
1497            false => send_lines_unix_datagram(path, lines).await,
1498            true => send_lines_unix_stream(path, lines).await,
1499        }
1500    }
1501
1502    #[cfg(unix)]
1503    async fn unix_message(
1504        message: &str,
1505        stream: bool,
1506        use_vector_namespace: bool,
1507    ) -> (PathBuf, impl Stream<Item = Event> + use<>) {
1508        let (tx, rx) = SourceSender::new_test();
1509        let path = init_unix(tx, stream, use_vector_namespace).await;
1510        let path_clone = path.clone();
1511
1512        unix_send_lines(stream, path, &[message]).await;
1513
1514        (path_clone, rx)
1515    }
1516
1517    #[cfg(unix)]
1518    async fn unix_multiple_packets(stream: bool) {
1519        let (tx, rx) = SourceSender::new_test();
1520        let path = init_unix(tx, stream, false).await;
1521
1522        unix_send_lines(stream, path, &["test", "test2"]).await;
1523        let events = collect_n(rx, 2).await;
1524
1525        assert_eq!(2, events.len());
1526        assert_eq!(
1527            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1528            "test".into()
1529        );
1530        assert_eq!(
1531            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1532            "test2".into()
1533        );
1534    }
1535
1536    #[cfg(unix)]
1537    fn parses_unix_config(mode: &str) -> SocketConfig {
1538        toml::from_str::<SocketConfig>(&format!(
1539            r#"
1540               mode = "{mode}"
1541               path = "/does/not/exist"
1542            "#
1543        ))
1544        .unwrap()
1545    }
1546
1547    #[cfg(unix)]
1548    fn parses_unix_config_file_mode(mode: &str) -> SocketConfig {
1549        toml::from_str::<SocketConfig>(&format!(
1550            r#"
1551               mode = "{mode}"
1552               path = "/does/not/exist"
1553               socket_file_mode = 0o777
1554            "#
1555        ))
1556        .unwrap()
1557    }
1558
1559    ////////////// UNIX DATAGRAM TESTS //////////////
1560    #[cfg(unix)]
1561    async fn send_lines_unix_datagram(path: PathBuf, lines: &[&str]) {
1562        let packets = lines.iter().map(|line| Bytes::from(line.to_string()));
1563        send_packets_unix_datagram(path, packets).await;
1564    }
1565
1566    #[cfg(unix)]
1567    async fn send_packets_unix_datagram(path: PathBuf, packets: impl IntoIterator<Item = Bytes>) {
1568        let socket = UnixDatagram::unbound().unwrap();
1569        socket.connect(path).unwrap();
1570
1571        for packet in packets {
1572            socket.send(&packet).await.unwrap();
1573        }
1574        socket.shutdown(std::net::Shutdown::Both).unwrap();
1575    }
1576
1577    #[cfg(unix)]
1578    #[tokio::test]
1579    async fn unix_datagram_message() {
1580        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1581            let (_, rx) = unix_message("test", false, false).await;
1582            let events = collect_n(rx, 1).await;
1583
1584            assert_eq!(events.len(), 1);
1585            assert_eq!(
1586                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1587                "test".into()
1588            );
1589            assert_eq!(
1590                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1591                "socket".into()
1592            );
1593            assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into());
1594        })
1595        .await;
1596    }
1597
1598    #[ignore]
1599    #[cfg(unix)]
1600    #[tokio::test]
1601    async fn unix_datagram_socket_test() {
1602        // This test is useful for testing the behavior of datagram
1603        // sockets.
1604
1605        use tempfile::tempdir;
1606        use tokio::net::UnixDatagram;
1607
1608        let tmp = tempdir().unwrap();
1609
1610        let tx_path = tmp.path().join("tx");
1611
1612        // Switch this var between "bound" and "unbound" to test
1613        // different types of socket behavior.
1614        let tx_type = "bound";
1615
1616        let tx = if tx_type == "bound" {
1617            UnixDatagram::bind(&tx_path).unwrap()
1618        } else {
1619            UnixDatagram::unbound().unwrap()
1620        };
1621
1622        // The following debug statements showcase some useful info:
1623        // dbg!(tx.local_addr().unwrap());
1624        // dbg!(std::os::unix::prelude::AsRawFd::as_raw_fd(&tx));
1625
1626        // Create another, bound socket
1627        let rx_path = tmp.path().join("rx");
1628        let rx = UnixDatagram::bind(&rx_path).unwrap();
1629
1630        // Connect to the bound socket
1631        tx.connect(&rx_path).unwrap();
1632
1633        // Send to the bound socket
1634        let bytes = b"hello world";
1635        tx.send(bytes).await.unwrap();
1636
1637        let mut buf = vec![0u8; 24];
1638        let (size, _) = rx.recv_from(&mut buf).await.unwrap();
1639
1640        let dgram = &buf[..size];
1641        assert_eq!(dgram, bytes);
1642    }
1643
1644    #[cfg(unix)]
1645    #[tokio::test]
1646    async fn unix_datagram_chunked_gelf_messages() {
1647        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1648            let (tx, rx) = SourceSender::new_test();
1649            let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1650            let mut config = UnixConfig::new(in_path.clone());
1651            config.decoding = GelfDeserializerConfig::default().into();
1652            let path = init_unix_with_config(tx, false, false, config).await;
1653            let seed = 42;
1654            let mut rng = SmallRng::seed_from_u64(seed);
1655            let max_size = 20;
1656            let big_message = "This is a very large message".repeat(5);
1657            let another_big_message = "This is another very large message".repeat(5);
1658            let mut chunks = get_gelf_chunks(big_message.as_str(), max_size, &mut rng);
1659            let mut another_chunks =
1660                get_gelf_chunks(another_big_message.as_str(), max_size, &mut rng);
1661            chunks.append(&mut another_chunks);
1662            chunks.shuffle(&mut rng);
1663
1664            send_packets_unix_datagram(path, chunks).await;
1665
1666            let events = collect_n(rx, 2).await;
1667            assert_eq!(
1668                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1669                big_message.into()
1670            );
1671            assert_eq!(
1672                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1673                another_big_message.into()
1674            );
1675        })
1676        .await;
1677    }
1678
1679    #[cfg(unix)]
1680    #[tokio::test]
1681    async fn unix_datagram_message_with_vector_namespace() {
1682        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1683            let (_, rx) = unix_message("test", false, true).await;
1684            let events = collect_n(rx, 1).await;
1685            let log = events[0].as_log();
1686            let event_meta = log.metadata().value();
1687
1688            assert_eq!(log.value(), &"test".into());
1689            assert_eq!(events.len(), 1);
1690
1691            assert_eq!(
1692                event_meta.get(path!("vector", "source_type")).unwrap(),
1693                &value!(SocketConfig::NAME)
1694            );
1695
1696            assert_eq!(
1697                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1698                &value!(UNNAMED_SOCKET_HOST)
1699            );
1700        })
1701        .await;
1702    }
1703
1704    #[cfg(unix)]
1705    #[tokio::test]
1706    async fn unix_datagram_message_preserves_newline() {
1707        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1708            let (_, rx) = unix_message("foo\nbar", false, false).await;
1709            let events = collect_n(rx, 1).await;
1710
1711            assert_eq!(events.len(), 1);
1712            assert_eq!(
1713                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1714                "foo\nbar".into()
1715            );
1716            assert_eq!(
1717                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1718                "socket".into()
1719            );
1720        })
1721        .await;
1722    }
1723
1724    #[cfg(unix)]
1725    #[tokio::test]
1726    async fn unix_datagram_multiple_packets() {
1727        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1728            unix_multiple_packets(false).await
1729        })
1730        .await;
1731    }
1732
1733    #[cfg(unix)]
1734    #[test]
1735    fn parses_unix_datagram_config() {
1736        let config = parses_unix_config("unix_datagram");
1737        assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1738    }
1739
1740    #[cfg(unix)]
1741    #[test]
1742    fn parses_unix_datagram_perms() {
1743        let config = parses_unix_config_file_mode("unix_datagram");
1744        assert!(matches!(config.mode, Mode::UnixDatagram { .. }));
1745    }
1746
1747    #[cfg(unix)]
1748    #[tokio::test]
1749    async fn unix_datagram_permissions() {
1750        let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1751        let (tx, _) = SourceSender::new_test();
1752
1753        let mut config = UnixConfig::new(in_path.clone());
1754        config.socket_file_mode = Some(0o555);
1755        let mode = Mode::UnixDatagram(config);
1756        let server = SocketConfig { mode }
1757            .build(SourceContext::new_test(tx, None))
1758            .await
1759            .unwrap();
1760        tokio::spawn(server);
1761
1762        wait_for(|| {
1763            match std::fs::metadata(&in_path) {
1764                Ok(meta) => {
1765                    match meta.permissions().mode() {
1766                        // S_IFSOCK   0140000   socket
1767                        0o140555 => ready(true),
1768                        _ => ready(false),
1769                    }
1770                }
1771                Err(_) => ready(false),
1772            }
1773        })
1774        .await;
1775    }
1776
1777    ////////////// UNIX STREAM TESTS //////////////
1778    #[cfg(unix)]
1779    async fn send_lines_unix_stream(path: PathBuf, lines: &[&str]) {
1780        let socket = UnixStream::connect(path).await.unwrap();
1781        let mut sink = FramedWrite::new(socket, LinesCodec::new());
1782
1783        let lines = lines.iter().map(|s| Ok(s.to_string()));
1784        let lines = lines.collect::<Vec<_>>();
1785        sink.send_all(&mut stream::iter(lines)).await.unwrap();
1786
1787        let mut socket = sink.into_inner();
1788        socket.shutdown().await.unwrap();
1789    }
1790
1791    #[cfg(unix)]
1792    #[tokio::test]
1793    async fn unix_stream_message() {
1794        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1795            let (_, rx) = unix_message("test", true, false).await;
1796            let events = collect_n(rx, 1).await;
1797
1798            assert_eq!(1, events.len());
1799            assert_eq!(
1800                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1801                "test".into()
1802            );
1803            assert_eq!(
1804                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1805                "socket".into()
1806            );
1807        })
1808        .await;
1809    }
1810
1811    #[cfg(unix)]
1812    #[tokio::test]
1813    async fn unix_stream_message_with_vector_namespace() {
1814        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1815            let (_, rx) = unix_message("test", true, true).await;
1816            let events = collect_n(rx, 1).await;
1817            let log = events[0].as_log();
1818            let event_meta = log.metadata().value();
1819
1820            assert_eq!(log.value(), &"test".into());
1821            assert_eq!(1, events.len());
1822            assert_eq!(
1823                event_meta.get(path!("vector", "source_type")).unwrap(),
1824                &value!(SocketConfig::NAME)
1825            );
1826            assert_eq!(
1827                event_meta.get(path!(SocketConfig::NAME, "host")).unwrap(),
1828                &value!(UNNAMED_SOCKET_HOST)
1829            );
1830        })
1831        .await;
1832    }
1833
1834    #[cfg(unix)]
1835    #[tokio::test]
1836    async fn unix_stream_message_splits_on_newline() {
1837        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1838            let (_, rx) = unix_message("foo\nbar", true, false).await;
1839            let events = collect_n(rx, 2).await;
1840
1841            assert_eq!(events.len(), 2);
1842            assert_eq!(
1843                events[0].as_log()[log_schema().message_key().unwrap().to_string()],
1844                "foo".into()
1845            );
1846            assert_eq!(
1847                events[0].as_log()[log_schema().source_type_key().unwrap().to_string()],
1848                "socket".into()
1849            );
1850            assert_eq!(
1851                events[1].as_log()[log_schema().message_key().unwrap().to_string()],
1852                "bar".into()
1853            );
1854            assert_eq!(
1855                events[1].as_log()[log_schema().source_type_key().unwrap().to_string()],
1856                "socket".into()
1857            );
1858        })
1859        .await;
1860    }
1861
1862    #[cfg(unix)]
1863    #[tokio::test]
1864    async fn unix_stream_multiple_packets() {
1865        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1866            unix_multiple_packets(true).await
1867        })
1868        .await;
1869    }
1870
1871    #[cfg(unix)]
1872    #[test]
1873    fn parses_new_unix_stream_config() {
1874        let config = parses_unix_config("unix_stream");
1875        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1876    }
1877
1878    #[cfg(unix)]
1879    #[test]
1880    fn parses_new_unix_datagram_perms() {
1881        let config = parses_unix_config_file_mode("unix_stream");
1882        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1883    }
1884
1885    #[cfg(unix)]
1886    #[test]
1887    fn parses_old_unix_stream_config() {
1888        let config = parses_unix_config("unix");
1889        assert!(matches!(config.mode, Mode::UnixStream { .. }));
1890    }
1891
1892    #[cfg(unix)]
1893    #[tokio::test]
1894    async fn unix_stream_permissions() {
1895        let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
1896        let (tx, _) = SourceSender::new_test();
1897
1898        let mut config = UnixConfig::new(in_path.clone());
1899        config.socket_file_mode = Some(0o421);
1900        let mode = Mode::UnixStream(config);
1901        let server = SocketConfig { mode }
1902            .build(SourceContext::new_test(tx, None))
1903            .await
1904            .unwrap();
1905        tokio::spawn(server);
1906
1907        wait_for(|| {
1908            match std::fs::metadata(&in_path) {
1909                Ok(meta) => {
1910                    match meta.permissions().mode() {
1911                        // S_IFSOCK   0140000   socket
1912                        0o140421 => ready(true),
1913                        _ => ready(false),
1914                    }
1915                }
1916                Err(_) => ready(false),
1917            }
1918        })
1919        .await;
1920    }
1921}