vector/sinks/
socket.rs

1use vector_lib::{
2    codecs::{
3        TextSerializerConfig,
4        encoding::{Framer, FramingConfig},
5    },
6    configurable::configurable_component,
7};
8
9#[cfg(not(windows))]
10use crate::sinks::util::unix::UnixSinkConfig;
11use crate::{
12    codecs::{Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType},
13    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
14    sinks::util::{tcp::TcpSinkConfig, udp::UdpSinkConfig},
15};
16
17/// Configuration for the `socket` sink.
18#[configurable_component(sink("socket", "Deliver logs to a remote socket endpoint."))]
19#[derive(Clone, Debug)]
20pub struct SocketSinkConfig {
21    #[serde(flatten)]
22    pub mode: Mode,
23
24    #[configurable(derived)]
25    #[serde(
26        default,
27        deserialize_with = "crate::serde::bool_or_struct",
28        skip_serializing_if = "crate::serde::is_default"
29    )]
30    pub acknowledgements: AcknowledgementsConfig,
31}
32
33/// Socket mode.
34#[configurable_component]
35#[derive(Clone, Debug)]
36#[serde(tag = "mode", rename_all = "snake_case")]
37#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
38pub enum Mode {
39    /// Send over TCP.
40    Tcp(TcpMode),
41
42    /// Send over UDP.
43    Udp(UdpMode),
44
45    /// Send over a Unix domain socket (UDS), in stream mode.
46    #[serde(alias = "unix")]
47    UnixStream(UnixMode),
48
49    /// Send over a Unix domain socket (UDS), in datagram mode.
50    /// Unavailable on macOS, due to send(2)'s apparent non-blocking behavior,
51    /// resulting in ENOBUFS errors which we currently don't handle.
52    UnixDatagram(UnixMode),
53}
54
55/// TCP configuration.
56#[configurable_component]
57#[derive(Clone, Debug)]
58pub struct TcpMode {
59    #[serde(flatten)]
60    config: TcpSinkConfig,
61
62    #[serde(flatten)]
63    encoding: EncodingConfigWithFraming,
64}
65
66/// UDP configuration.
67#[configurable_component]
68#[derive(Clone, Debug)]
69pub struct UdpMode {
70    #[serde(flatten)]
71    config: UdpSinkConfig,
72
73    #[configurable(derived)]
74    encoding: EncodingConfig,
75}
76
77/// Unix Domain Socket configuration.
78#[configurable_component]
79#[derive(Clone, Debug)]
80pub struct UnixMode {
81    #[serde(flatten)]
82    config: UnixSinkConfig,
83
84    #[serde(flatten)]
85    encoding: EncodingConfigWithFraming,
86}
87
88// Workaround for https://github.com/vectordotdev/vector/issues/22198.
89#[cfg(windows)]
90/// A Unix Domain Socket sink.
91#[configurable_component]
92#[derive(Clone, Debug)]
93pub struct UnixSinkConfig {
94    /// The Unix socket path.
95    ///
96    /// This should be an absolute path.
97    #[configurable(metadata(docs::examples = "/path/to/socket"))]
98    pub path: std::path::PathBuf,
99}
100
101impl GenerateConfig for SocketSinkConfig {
102    fn generate_config() -> toml::Value {
103        toml::from_str(
104            r#"address = "92.12.333.224:5000"
105            mode = "tcp"
106            encoding.codec = "json""#,
107        )
108        .unwrap()
109    }
110}
111
112impl SocketSinkConfig {
113    pub const fn new(mode: Mode, acknowledgements: AcknowledgementsConfig) -> Self {
114        SocketSinkConfig {
115            mode,
116            acknowledgements,
117        }
118    }
119
120    pub fn make_basic_tcp_config(
121        address: String,
122        acknowledgements: AcknowledgementsConfig,
123    ) -> Self {
124        Self::new(
125            Mode::Tcp(TcpMode {
126                config: TcpSinkConfig::from_address(address),
127                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
128            }),
129            acknowledgements,
130        )
131    }
132}
133
134#[async_trait::async_trait]
135#[typetag::serde(name = "socket")]
136impl SinkConfig for SocketSinkConfig {
137    async fn build(
138        &self,
139        _cx: SinkContext,
140    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
141        match &self.mode {
142            Mode::Tcp(TcpMode { config, encoding }) => {
143                let transformer = encoding.transformer();
144                let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
145                let encoder = Encoder::<Framer>::new(framer, serializer);
146                config.build(transformer, encoder)
147            }
148            Mode::Udp(UdpMode { config, encoding }) => {
149                let transformer = encoding.transformer();
150                let serializer = encoding.build()?;
151                let chunker = serializer.chunker();
152                let encoder = Encoder::<()>::new(serializer);
153                config.build(transformer, encoder, chunker)
154            }
155            #[cfg(unix)]
156            Mode::UnixStream(UnixMode { config, encoding }) => {
157                let transformer = encoding.transformer();
158                let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
159                let encoder = Encoder::<Framer>::new(framer, serializer);
160                config.build(
161                    transformer,
162                    encoder,
163                    super::util::service::net::UnixMode::Stream,
164                )
165            }
166            #[allow(unused)]
167            #[cfg(unix)]
168            Mode::UnixDatagram(UnixMode { config, encoding }) => {
169                cfg_if! {
170                    if #[cfg(not(target_os = "macos"))] {
171                        let transformer = encoding.transformer();
172                        let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
173                        let encoder = Encoder::<Framer>::new(framer, serializer);
174                        config.build(
175                            transformer,
176                            encoder,
177                            super::util::service::net::UnixMode::Datagram,
178                        )
179                    }
180                    else {
181                        Err("UnixDatagram is not available on macOS platforms.".into())
182                    }
183                }
184            }
185            #[cfg(not(unix))]
186            Mode::UnixStream(_) | Mode::UnixDatagram(_) => {
187                Err("Unix modes are supported only on Unix platforms.".into())
188            }
189        }
190    }
191
192    fn input(&self) -> Input {
193        let encoder_input_type = match &self.mode {
194            Mode::Tcp(TcpMode { encoding, .. }) => encoding.config().1.input_type(),
195            Mode::Udp(UdpMode { encoding, .. }) => encoding.config().input_type(),
196            Mode::UnixStream(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
197            Mode::UnixDatagram(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
198        };
199        Input::new(encoder_input_type)
200    }
201
202    fn acknowledgements(&self) -> &AcknowledgementsConfig {
203        &self.acknowledgements
204    }
205}
206
207#[cfg(test)]
208mod test {
209    use std::{
210        future::ready,
211        net::{SocketAddr, UdpSocket},
212    };
213
214    #[cfg(target_os = "windows")]
215    use cfg_if::cfg_if;
216    use futures::stream::StreamExt;
217    use futures_util::stream;
218    use serde_json::Value;
219    use tokio::{
220        net::TcpListener,
221        time::{Duration, sleep, timeout},
222    };
223    use tokio_stream::wrappers::TcpListenerStream;
224    use tokio_util::codec::{FramedRead, LinesCodec};
225    use vector_lib::codecs::JsonSerializerConfig;
226
227    use super::*;
228    cfg_if! { if #[cfg(unix)] {
229        use vector_lib::codecs::NativeJsonSerializerConfig;
230        use crate::test_util::random_metrics_with_stream;
231        use std::path::PathBuf;
232    } }
233    #[cfg(all(unix, not(target_os = "macos")))]
234    use std::os::unix::net::UnixDatagram;
235
236    use crate::{
237        config::SinkContext,
238        event::{Event, LogEvent},
239        test_util::{
240            CountReceiver,
241            addr::{next_addr, next_addr_v6},
242            components::{SINK_TAGS, assert_sink_compliance, run_and_assert_sink_compliance},
243            random_lines_with_stream, trace_init,
244        },
245    };
246
247    #[test]
248    fn generate_config() {
249        crate::test_util::test_generate_config::<SocketSinkConfig>();
250    }
251
252    enum DatagramSocket {
253        Udp(UdpSocket),
254        #[cfg(all(unix, not(target_os = "macos")))]
255        Unix(UnixDatagram),
256    }
257
258    enum DatagramSocketAddr {
259        Udp(SocketAddr),
260        #[cfg(all(unix, not(target_os = "macos")))]
261        Unix(PathBuf),
262    }
263
264    async fn test_datagram(datagram_addr: DatagramSocketAddr) {
265        let receiver = match &datagram_addr {
266            DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
267            #[cfg(all(unix, not(target_os = "macos")))]
268            DatagramSocketAddr::Unix(path) => {
269                DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
270            }
271        };
272
273        let config = SocketSinkConfig {
274            mode: match &datagram_addr {
275                DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
276                    config: UdpSinkConfig::from_address(addr.to_string()),
277                    encoding: JsonSerializerConfig::default().into(),
278                }),
279                #[cfg(all(unix, not(target_os = "macos")))]
280                DatagramSocketAddr::Unix(path) => Mode::UnixDatagram(UnixMode {
281                    config: UnixSinkConfig::new(path.to_path_buf()),
282                    encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
283                }),
284            },
285            acknowledgements: Default::default(),
286        };
287
288        let context = SinkContext::default();
289        assert_sink_compliance(&SINK_TAGS, async move {
290            let (sink, _healthcheck) = config.build(context).await.unwrap();
291
292            let event = Event::Log(LogEvent::from("raw log line"));
293            sink.run(stream::once(ready(event.into()))).await
294        })
295        .await
296        .expect("Running sink failed");
297
298        let mut buf = [0; 256];
299        let size = match &receiver {
300            DatagramSocket::Udp(sock) => {
301                sock.recv_from(&mut buf).expect("Did not receive message").0
302            }
303            #[cfg(all(unix, not(target_os = "macos")))]
304            DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
305        };
306
307        let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
308        let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
309        let data = data.as_object().expect("Not a JSON object");
310        assert!(data.get("timestamp").is_some());
311        let message = data.get("message").expect("No message in JSON");
312        assert_eq!(message, &Value::String("raw log line".into()));
313    }
314
315    #[tokio::test]
316    async fn udp_ipv4() {
317        trace_init();
318
319        let (_guard, addr) = next_addr();
320        test_datagram(DatagramSocketAddr::Udp(addr)).await;
321    }
322
323    #[tokio::test]
324    async fn udp_ipv6() {
325        trace_init();
326
327        let (_guard, addr) = next_addr_v6();
328        test_datagram(DatagramSocketAddr::Udp(addr)).await;
329    }
330
331    #[cfg(all(unix, not(target_os = "macos")))]
332    #[tokio::test]
333    async fn unix_datagram() {
334        trace_init();
335
336        test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
337            "unix_datagram_socket_test",
338        )))
339        .await;
340    }
341
342    #[tokio::test]
343    async fn tcp_stream() {
344        trace_init();
345
346        let (_guard, addr) = next_addr();
347        let config = SocketSinkConfig {
348            mode: Mode::Tcp(TcpMode {
349                config: TcpSinkConfig::from_address(addr.to_string()),
350                encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
351            }),
352            acknowledgements: Default::default(),
353        };
354
355        let mut receiver = CountReceiver::receive_lines(addr);
356
357        let (lines, events) = random_lines_with_stream(10, 100, None);
358
359        assert_sink_compliance(&SINK_TAGS, async move {
360            let context = SinkContext::default();
361            let (sink, _healthcheck) = config.build(context).await.unwrap();
362
363            sink.run(events).await
364        })
365        .await
366        .expect("Running sink failed");
367
368        // Wait for output to connect
369        receiver.connected().await;
370
371        let output = receiver.await;
372        assert_eq!(lines.len(), output.len());
373        for (source, received) in lines.iter().zip(output) {
374            let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
375            let received = json.get("message").unwrap().as_str().unwrap();
376            assert_eq!(source, received);
377        }
378    }
379
380    #[cfg(unix)]
381    #[tokio::test]
382    async fn metrics_socket() {
383        trace_init();
384
385        let out_path = temp_uds_path("unix_socket_test");
386        let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
387
388        let config = SocketSinkConfig {
389            mode: Mode::UnixStream(UnixMode {
390                config: UnixSinkConfig::new(out_path),
391                encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
392            }),
393            acknowledgements: Default::default(),
394        };
395
396        let (expected, events) = random_metrics_with_stream(10, None, None);
397
398        assert_sink_compliance(&SINK_TAGS, async move {
399            let context = SinkContext::default();
400            let (sink, _healthcheck) = config.build(context).await.unwrap();
401
402            sink.run(events).await
403        })
404        .await
405        .expect("Running sink failed");
406
407        // Wait for output to connect
408        receiver.connected().await;
409
410        let output = receiver.await;
411        assert_eq!(expected.len(), output.len());
412        for (source, received) in expected.iter().zip(output) {
413            let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
414            let received = json.get("metric").unwrap();
415            let received_name = received.get("name").unwrap().as_str().unwrap();
416            assert_eq!(source.as_metric().name(), received_name);
417        }
418    }
419
420    // This is a test that checks that we properly receive all events in the
421    // case of a proper server side write side shutdown.
422    //
423    // This test basically sends 10 events, shuts down the server and forces a
424    // reconnect. It then forces another 10 events through and we should get a
425    // total of 20 events.
426    //
427    // If this test hangs that means somewhere we are not collecting the correct
428    // events.
429    #[tokio::test]
430    async fn tcp_stream_detects_disconnect() {
431        use std::{
432            pin::Pin,
433            sync::{
434                Arc,
435                atomic::{AtomicUsize, Ordering},
436            },
437            task::Poll,
438        };
439
440        use futures::{FutureExt, SinkExt, StreamExt, channel::mpsc};
441        use tokio::{
442            io::{AsyncRead, AsyncWriteExt, ReadBuf},
443            net::TcpStream,
444            task::yield_now,
445            time::{Duration, interval},
446        };
447        use tokio_stream::wrappers::IntervalStream;
448
449        use crate::{
450            event::EventArray,
451            tls::{self, MaybeTlsIncomingStream, MaybeTlsSettings, TlsConfig, TlsEnableableConfig},
452        };
453
454        trace_init();
455
456        let (_guard, addr) = next_addr();
457        let config = SocketSinkConfig {
458            mode: Mode::Tcp(TcpMode {
459                config: TcpSinkConfig::new(
460                    addr.to_string(),
461                    None,
462                    Some(TlsEnableableConfig {
463                        enabled: Some(true),
464                        options: TlsConfig {
465                            verify_certificate: Some(false),
466                            verify_hostname: Some(false),
467                            ca_file: Some(tls::TEST_PEM_CRT_PATH.into()),
468                            ..Default::default()
469                        },
470                    }),
471                    None,
472                ),
473                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
474            }),
475            acknowledgements: Default::default(),
476        };
477        let context = SinkContext::default();
478        let (sink, _healthcheck) = config.build(context).await.unwrap();
479        let (mut sender, receiver) = mpsc::channel::<Option<EventArray>>(0);
480        let jh1 = tokio::spawn(async move {
481            let stream = receiver
482                .take_while(|event| ready(event.is_some()))
483                .map(|event| event.unwrap())
484                .boxed();
485            run_and_assert_sink_compliance(sink, stream, &SINK_TAGS).await
486        });
487
488        let msg_counter = Arc::new(AtomicUsize::new(0));
489        let msg_counter1 = Arc::clone(&msg_counter);
490        let conn_counter = Arc::new(AtomicUsize::new(0));
491        let conn_counter1 = Arc::clone(&conn_counter);
492
493        let (close_tx, close_rx) = tokio::sync::oneshot::channel::<()>();
494        let mut close_rx = Some(close_rx.map(|x| x.unwrap()));
495
496        let config = Some(TlsEnableableConfig::test_config());
497
498        // Only accept two connections.
499        let jh2 = tokio::spawn(async move {
500            let tls = MaybeTlsSettings::from_config(config.as_ref(), true).unwrap();
501            let listener = tls.bind(&addr).await.unwrap();
502            listener
503                .accept_stream()
504                .take(2)
505                .for_each(|connection| {
506                    let mut close_rx = close_rx.take();
507
508                    conn_counter1.fetch_add(1, Ordering::SeqCst);
509                    let msg_counter1 = Arc::clone(&msg_counter1);
510
511                    let mut stream: MaybeTlsIncomingStream<TcpStream> = connection.unwrap();
512
513                    std::future::poll_fn(move |cx| {
514                        loop {
515                            if let Some(fut) = close_rx.as_mut()
516                                && let Poll::Ready(()) = fut.poll_unpin(cx)
517                            {
518                                stream
519                                    .get_mut()
520                                    .unwrap()
521                                    .shutdown()
522                                    .now_or_never()
523                                    .unwrap()
524                                    .unwrap();
525                                close_rx = None;
526                            }
527
528                            let mut buf = [0u8; 11];
529                            let mut buf = ReadBuf::new(&mut buf);
530                            return match Pin::new(&mut stream).poll_read(cx, &mut buf) {
531                                Poll::Ready(Ok(())) => {
532                                    if buf.filled().is_empty() {
533                                        Poll::Ready(())
534                                    } else {
535                                        msg_counter1.fetch_add(1, Ordering::SeqCst);
536                                        continue;
537                                    }
538                                }
539                                Poll::Ready(Err(error)) => panic!("{error}"),
540                                Poll::Pending => Poll::Pending,
541                            };
542                        }
543                    })
544                })
545                .await;
546        });
547
548        let (_, mut events) = random_lines_with_stream(10, 10, None);
549        while let Some(event) = events.next().await {
550            sender.send(Some(event)).await.unwrap();
551        }
552
553        // Loop and check for 10 events, we should always get 10 events. Once,
554        // we have 10 events we can tell the server to shutdown to simulate the
555        // remote shutting down on an idle connection.
556        IntervalStream::new(interval(Duration::from_millis(100)))
557            .take(500)
558            .take_while(|_| ready(msg_counter.load(Ordering::SeqCst) != 10))
559            .for_each(|_| ready(()))
560            .await;
561        close_tx.send(()).unwrap();
562
563        // Close connection in spawned future
564        yield_now().await;
565
566        assert_eq!(msg_counter.load(Ordering::SeqCst), 10);
567        assert_eq!(conn_counter.load(Ordering::SeqCst), 1);
568
569        // Send another 10 events
570        let (_, mut events) = random_lines_with_stream(10, 10, None);
571        while let Some(event) = events.next().await {
572            sender.send(Some(event)).await.unwrap();
573        }
574
575        // Wait for server task to be complete.
576        sender.send(None).await.unwrap();
577        jh1.await.unwrap();
578        jh2.await.unwrap();
579
580        // Check that there are exactly 20 events.
581        assert_eq!(msg_counter.load(Ordering::SeqCst), 20);
582        assert_eq!(conn_counter.load(Ordering::SeqCst), 2);
583    }
584
585    /// Tests whether socket recovers from a hard disconnect.
586    #[tokio::test]
587    async fn reconnect() {
588        trace_init();
589
590        let (_guard, addr) = next_addr();
591        let config = SocketSinkConfig {
592            mode: Mode::Tcp(TcpMode {
593                config: TcpSinkConfig::from_address(addr.to_string()),
594                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
595            }),
596            acknowledgements: Default::default(),
597        };
598
599        let context = SinkContext::default();
600        let (sink, _healthcheck) = config.build(context).await.unwrap();
601
602        let (_, events) = random_lines_with_stream(1000, 10000, None);
603        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(sink, events, &SINK_TAGS));
604
605        // First listener
606        let mut count = 20usize;
607        TcpListenerStream::new(TcpListener::bind(addr).await.unwrap())
608            .next()
609            .await
610            .unwrap()
611            .map(|socket| FramedRead::new(socket, LinesCodec::new()))
612            .unwrap()
613            .map(|x| x.unwrap())
614            .take_while(|_| {
615                ready(if count > 0 {
616                    count -= 1;
617                    true
618                } else {
619                    false
620                })
621            })
622            .collect::<Vec<_>>()
623            .await;
624
625        // Disconnect
626        if cfg!(windows) {
627            // Gives Windows time to release the addr port.
628            sleep(Duration::from_secs(1)).await;
629        }
630
631        // Second listener
632        // If this doesn't succeed then the sink hanged.
633        assert!(
634            timeout(
635                Duration::from_secs(5),
636                CountReceiver::receive_lines(addr).connected()
637            )
638            .await
639            .is_ok()
640        );
641
642        sink_handle.await.unwrap();
643    }
644
645    #[cfg(unix)]
646    fn temp_uds_path(name: &str) -> PathBuf {
647        tempfile::tempdir().unwrap().keep().join(name)
648    }
649}