vector/sinks/
socket.rs

1use vector_lib::{
2    codecs::{
3        TextSerializerConfig,
4        encoding::{Framer, FramingConfig},
5    },
6    configurable::configurable_component,
7};
8
9#[cfg(not(windows))]
10use crate::sinks::util::unix::UnixSinkConfig;
11use crate::{
12    codecs::{Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType},
13    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
14    sinks::util::{tcp::TcpSinkConfig, udp::UdpSinkConfig},
15};
16
17/// Configuration for the `socket` sink.
18#[configurable_component(sink("socket", "Deliver logs to a remote socket endpoint."))]
19#[derive(Clone, Debug)]
20pub struct SocketSinkConfig {
21    #[serde(flatten)]
22    pub mode: Mode,
23
24    #[configurable(derived)]
25    #[serde(
26        default,
27        deserialize_with = "crate::serde::bool_or_struct",
28        skip_serializing_if = "crate::serde::is_default"
29    )]
30    pub acknowledgements: AcknowledgementsConfig,
31}
32
33/// Socket mode.
34#[configurable_component]
35#[derive(Clone, Debug)]
36#[serde(tag = "mode", rename_all = "snake_case")]
37#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
38pub enum Mode {
39    /// Send over TCP.
40    Tcp(TcpMode),
41
42    /// Send over UDP.
43    Udp(UdpMode),
44
45    /// Send over a Unix domain socket (UDS), in stream mode.
46    #[serde(alias = "unix")]
47    UnixStream(UnixMode),
48
49    /// Send over a Unix domain socket (UDS), in datagram mode.
50    /// Unavailable on macOS, due to send(2)'s apparent non-blocking behavior,
51    /// resulting in ENOBUFS errors which we currently don't handle.
52    UnixDatagram(UnixMode),
53}
54
55/// TCP configuration.
56#[configurable_component]
57#[derive(Clone, Debug)]
58pub struct TcpMode {
59    #[serde(flatten)]
60    config: TcpSinkConfig,
61
62    #[serde(flatten)]
63    encoding: EncodingConfigWithFraming,
64}
65
66/// UDP configuration.
67#[configurable_component]
68#[derive(Clone, Debug)]
69pub struct UdpMode {
70    #[serde(flatten)]
71    config: UdpSinkConfig,
72
73    #[configurable(derived)]
74    encoding: EncodingConfig,
75}
76
77/// Unix Domain Socket configuration.
78#[configurable_component]
79#[derive(Clone, Debug)]
80pub struct UnixMode {
81    #[serde(flatten)]
82    config: UnixSinkConfig,
83
84    #[serde(flatten)]
85    encoding: EncodingConfigWithFraming,
86}
87
88// Workaround for https://github.com/vectordotdev/vector/issues/22198.
89#[cfg(windows)]
90/// A Unix Domain Socket sink.
91#[configurable_component]
92#[derive(Clone, Debug)]
93pub struct UnixSinkConfig {
94    /// The Unix socket path.
95    ///
96    /// This should be an absolute path.
97    #[configurable(metadata(docs::examples = "/path/to/socket"))]
98    pub path: std::path::PathBuf,
99}
100
101impl GenerateConfig for SocketSinkConfig {
102    fn generate_config() -> toml::Value {
103        toml::from_str(
104            r#"address = "92.12.333.224:5000"
105            mode = "tcp"
106            encoding.codec = "json""#,
107        )
108        .unwrap()
109    }
110}
111
112impl SocketSinkConfig {
113    pub const fn new(mode: Mode, acknowledgements: AcknowledgementsConfig) -> Self {
114        SocketSinkConfig {
115            mode,
116            acknowledgements,
117        }
118    }
119
120    pub fn make_basic_tcp_config(
121        address: String,
122        acknowledgements: AcknowledgementsConfig,
123    ) -> Self {
124        Self::new(
125            Mode::Tcp(TcpMode {
126                config: TcpSinkConfig::from_address(address),
127                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
128            }),
129            acknowledgements,
130        )
131    }
132}
133
134#[async_trait::async_trait]
135#[typetag::serde(name = "socket")]
136impl SinkConfig for SocketSinkConfig {
137    async fn build(
138        &self,
139        _cx: SinkContext,
140    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
141        match &self.mode {
142            Mode::Tcp(TcpMode { config, encoding }) => {
143                let transformer = encoding.transformer();
144                let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
145                let encoder = Encoder::<Framer>::new(framer, serializer);
146                config.build(transformer, encoder)
147            }
148            Mode::Udp(UdpMode { config, encoding }) => {
149                let transformer = encoding.transformer();
150                let serializer = encoding.build()?;
151                let chunker = serializer.chunker();
152                let encoder = Encoder::<()>::new(serializer);
153                config.build(transformer, encoder, chunker)
154            }
155            #[cfg(unix)]
156            Mode::UnixStream(UnixMode { config, encoding }) => {
157                let transformer = encoding.transformer();
158                let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
159                let encoder = Encoder::<Framer>::new(framer, serializer);
160                config.build(
161                    transformer,
162                    encoder,
163                    super::util::service::net::UnixMode::Stream,
164                )
165            }
166            #[allow(unused)]
167            #[cfg(unix)]
168            Mode::UnixDatagram(UnixMode { config, encoding }) => {
169                cfg_if! {
170                    if #[cfg(not(target_os = "macos"))] {
171                        let transformer = encoding.transformer();
172                        let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
173                        let encoder = Encoder::<Framer>::new(framer, serializer);
174                        config.build(
175                            transformer,
176                            encoder,
177                            super::util::service::net::UnixMode::Datagram,
178                        )
179                    }
180                    else {
181                        Err("UnixDatagram is not available on macOS platforms.".into())
182                    }
183                }
184            }
185            #[cfg(not(unix))]
186            Mode::UnixStream(_) | Mode::UnixDatagram(_) => {
187                Err("Unix modes are supported only on Unix platforms.".into())
188            }
189        }
190    }
191
192    fn input(&self) -> Input {
193        let encoder_input_type = match &self.mode {
194            Mode::Tcp(TcpMode { encoding, .. }) => encoding.config().1.input_type(),
195            Mode::Udp(UdpMode { encoding, .. }) => encoding.config().input_type(),
196            Mode::UnixStream(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
197            Mode::UnixDatagram(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
198        };
199        Input::new(encoder_input_type)
200    }
201
202    fn acknowledgements(&self) -> &AcknowledgementsConfig {
203        &self.acknowledgements
204    }
205}
206
207#[cfg(test)]
208mod test {
209    use std::{
210        future::ready,
211        net::{SocketAddr, UdpSocket},
212    };
213
214    #[cfg(target_os = "windows")]
215    use cfg_if::cfg_if;
216    use futures::stream::StreamExt;
217    use futures_util::stream;
218    use serde_json::Value;
219    use tokio::{
220        net::TcpListener,
221        time::{Duration, sleep, timeout},
222    };
223    use tokio_stream::wrappers::TcpListenerStream;
224    use tokio_util::codec::{FramedRead, LinesCodec};
225    use vector_lib::codecs::JsonSerializerConfig;
226
227    use super::*;
228    cfg_if! { if #[cfg(unix)] {
229        use vector_lib::codecs::NativeJsonSerializerConfig;
230        use crate::test_util::random_metrics_with_stream;
231        use std::path::PathBuf;
232    } }
233    #[cfg(all(unix, not(target_os = "macos")))]
234    use std::os::unix::net::UnixDatagram;
235
236    use crate::{
237        config::SinkContext,
238        event::{Event, LogEvent},
239        test_util::{
240            CountReceiver,
241            components::{SINK_TAGS, assert_sink_compliance, run_and_assert_sink_compliance},
242            next_addr, next_addr_v6, random_lines_with_stream, trace_init,
243        },
244    };
245
246    #[test]
247    fn generate_config() {
248        crate::test_util::test_generate_config::<SocketSinkConfig>();
249    }
250
251    enum DatagramSocket {
252        Udp(UdpSocket),
253        #[cfg(all(unix, not(target_os = "macos")))]
254        Unix(UnixDatagram),
255    }
256
257    enum DatagramSocketAddr {
258        Udp(SocketAddr),
259        #[cfg(all(unix, not(target_os = "macos")))]
260        Unix(PathBuf),
261    }
262
263    async fn test_datagram(datagram_addr: DatagramSocketAddr) {
264        let receiver = match &datagram_addr {
265            DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
266            #[cfg(all(unix, not(target_os = "macos")))]
267            DatagramSocketAddr::Unix(path) => {
268                DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
269            }
270        };
271
272        let config = SocketSinkConfig {
273            mode: match &datagram_addr {
274                DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
275                    config: UdpSinkConfig::from_address(addr.to_string()),
276                    encoding: JsonSerializerConfig::default().into(),
277                }),
278                #[cfg(all(unix, not(target_os = "macos")))]
279                DatagramSocketAddr::Unix(path) => Mode::UnixDatagram(UnixMode {
280                    config: UnixSinkConfig::new(path.to_path_buf()),
281                    encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
282                }),
283            },
284            acknowledgements: Default::default(),
285        };
286
287        let context = SinkContext::default();
288        assert_sink_compliance(&SINK_TAGS, async move {
289            let (sink, _healthcheck) = config.build(context).await.unwrap();
290
291            let event = Event::Log(LogEvent::from("raw log line"));
292            sink.run(stream::once(ready(event.into()))).await
293        })
294        .await
295        .expect("Running sink failed");
296
297        let mut buf = [0; 256];
298        let size = match &receiver {
299            DatagramSocket::Udp(sock) => {
300                sock.recv_from(&mut buf).expect("Did not receive message").0
301            }
302            #[cfg(all(unix, not(target_os = "macos")))]
303            DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
304        };
305
306        let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
307        let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
308        let data = data.as_object().expect("Not a JSON object");
309        assert!(data.get("timestamp").is_some());
310        let message = data.get("message").expect("No message in JSON");
311        assert_eq!(message, &Value::String("raw log line".into()));
312    }
313
314    #[tokio::test]
315    async fn udp_ipv4() {
316        trace_init();
317
318        test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
319    }
320
321    #[tokio::test]
322    async fn udp_ipv6() {
323        trace_init();
324
325        test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
326    }
327
328    #[cfg(all(unix, not(target_os = "macos")))]
329    #[tokio::test]
330    async fn unix_datagram() {
331        trace_init();
332
333        test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
334            "unix_datagram_socket_test",
335        )))
336        .await;
337    }
338
339    #[tokio::test]
340    async fn tcp_stream() {
341        trace_init();
342
343        let addr = next_addr();
344        let config = SocketSinkConfig {
345            mode: Mode::Tcp(TcpMode {
346                config: TcpSinkConfig::from_address(addr.to_string()),
347                encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
348            }),
349            acknowledgements: Default::default(),
350        };
351
352        let mut receiver = CountReceiver::receive_lines(addr);
353
354        let (lines, events) = random_lines_with_stream(10, 100, None);
355
356        assert_sink_compliance(&SINK_TAGS, async move {
357            let context = SinkContext::default();
358            let (sink, _healthcheck) = config.build(context).await.unwrap();
359
360            sink.run(events).await
361        })
362        .await
363        .expect("Running sink failed");
364
365        // Wait for output to connect
366        receiver.connected().await;
367
368        let output = receiver.await;
369        assert_eq!(lines.len(), output.len());
370        for (source, received) in lines.iter().zip(output) {
371            let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
372            let received = json.get("message").unwrap().as_str().unwrap();
373            assert_eq!(source, received);
374        }
375    }
376
377    #[cfg(unix)]
378    #[tokio::test]
379    async fn metrics_socket() {
380        trace_init();
381
382        let out_path = temp_uds_path("unix_socket_test");
383        let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
384
385        let config = SocketSinkConfig {
386            mode: Mode::UnixStream(UnixMode {
387                config: UnixSinkConfig::new(out_path),
388                encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
389            }),
390            acknowledgements: Default::default(),
391        };
392
393        let (expected, events) = random_metrics_with_stream(10, None, None);
394
395        assert_sink_compliance(&SINK_TAGS, async move {
396            let context = SinkContext::default();
397            let (sink, _healthcheck) = config.build(context).await.unwrap();
398
399            sink.run(events).await
400        })
401        .await
402        .expect("Running sink failed");
403
404        // Wait for output to connect
405        receiver.connected().await;
406
407        let output = receiver.await;
408        assert_eq!(expected.len(), output.len());
409        for (source, received) in expected.iter().zip(output) {
410            let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
411            let received = json.get("metric").unwrap();
412            let received_name = received.get("name").unwrap().as_str().unwrap();
413            assert_eq!(source.as_metric().name(), received_name);
414        }
415    }
416
417    // This is a test that checks that we properly receive all events in the
418    // case of a proper server side write side shutdown.
419    //
420    // This test basically sends 10 events, shuts down the server and forces a
421    // reconnect. It then forces another 10 events through and we should get a
422    // total of 20 events.
423    //
424    // If this test hangs that means somewhere we are not collecting the correct
425    // events.
426    #[tokio::test]
427    async fn tcp_stream_detects_disconnect() {
428        use std::{
429            pin::Pin,
430            sync::{
431                Arc,
432                atomic::{AtomicUsize, Ordering},
433            },
434            task::Poll,
435        };
436
437        use futures::{FutureExt, SinkExt, StreamExt, channel::mpsc};
438        use tokio::{
439            io::{AsyncRead, AsyncWriteExt, ReadBuf},
440            net::TcpStream,
441            task::yield_now,
442            time::{Duration, interval},
443        };
444        use tokio_stream::wrappers::IntervalStream;
445
446        use crate::{
447            event::EventArray,
448            tls::{self, MaybeTlsIncomingStream, MaybeTlsSettings, TlsConfig, TlsEnableableConfig},
449        };
450
451        trace_init();
452
453        let addr = next_addr();
454        let config = SocketSinkConfig {
455            mode: Mode::Tcp(TcpMode {
456                config: TcpSinkConfig::new(
457                    addr.to_string(),
458                    None,
459                    Some(TlsEnableableConfig {
460                        enabled: Some(true),
461                        options: TlsConfig {
462                            verify_certificate: Some(false),
463                            verify_hostname: Some(false),
464                            ca_file: Some(tls::TEST_PEM_CRT_PATH.into()),
465                            ..Default::default()
466                        },
467                    }),
468                    None,
469                ),
470                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
471            }),
472            acknowledgements: Default::default(),
473        };
474        let context = SinkContext::default();
475        let (sink, _healthcheck) = config.build(context).await.unwrap();
476        let (mut sender, receiver) = mpsc::channel::<Option<EventArray>>(0);
477        let jh1 = tokio::spawn(async move {
478            let stream = receiver
479                .take_while(|event| ready(event.is_some()))
480                .map(|event| event.unwrap())
481                .boxed();
482            run_and_assert_sink_compliance(sink, stream, &SINK_TAGS).await
483        });
484
485        let msg_counter = Arc::new(AtomicUsize::new(0));
486        let msg_counter1 = Arc::clone(&msg_counter);
487        let conn_counter = Arc::new(AtomicUsize::new(0));
488        let conn_counter1 = Arc::clone(&conn_counter);
489
490        let (close_tx, close_rx) = tokio::sync::oneshot::channel::<()>();
491        let mut close_rx = Some(close_rx.map(|x| x.unwrap()));
492
493        let config = Some(TlsEnableableConfig::test_config());
494
495        // Only accept two connections.
496        let jh2 = tokio::spawn(async move {
497            let tls = MaybeTlsSettings::from_config(config.as_ref(), true).unwrap();
498            let listener = tls.bind(&addr).await.unwrap();
499            listener
500                .accept_stream()
501                .take(2)
502                .for_each(|connection| {
503                    let mut close_rx = close_rx.take();
504
505                    conn_counter1.fetch_add(1, Ordering::SeqCst);
506                    let msg_counter1 = Arc::clone(&msg_counter1);
507
508                    let mut stream: MaybeTlsIncomingStream<TcpStream> = connection.unwrap();
509
510                    std::future::poll_fn(move |cx| {
511                        loop {
512                            if let Some(fut) = close_rx.as_mut()
513                                && let Poll::Ready(()) = fut.poll_unpin(cx)
514                            {
515                                stream
516                                    .get_mut()
517                                    .unwrap()
518                                    .shutdown()
519                                    .now_or_never()
520                                    .unwrap()
521                                    .unwrap();
522                                close_rx = None;
523                            }
524
525                            let mut buf = [0u8; 11];
526                            let mut buf = ReadBuf::new(&mut buf);
527                            return match Pin::new(&mut stream).poll_read(cx, &mut buf) {
528                                Poll::Ready(Ok(())) => {
529                                    if buf.filled().is_empty() {
530                                        Poll::Ready(())
531                                    } else {
532                                        msg_counter1.fetch_add(1, Ordering::SeqCst);
533                                        continue;
534                                    }
535                                }
536                                Poll::Ready(Err(error)) => panic!("{error}"),
537                                Poll::Pending => Poll::Pending,
538                            };
539                        }
540                    })
541                })
542                .await;
543        });
544
545        let (_, mut events) = random_lines_with_stream(10, 10, None);
546        while let Some(event) = events.next().await {
547            sender.send(Some(event)).await.unwrap();
548        }
549
550        // Loop and check for 10 events, we should always get 10 events. Once,
551        // we have 10 events we can tell the server to shutdown to simulate the
552        // remote shutting down on an idle connection.
553        IntervalStream::new(interval(Duration::from_millis(100)))
554            .take(500)
555            .take_while(|_| ready(msg_counter.load(Ordering::SeqCst) != 10))
556            .for_each(|_| ready(()))
557            .await;
558        close_tx.send(()).unwrap();
559
560        // Close connection in spawned future
561        yield_now().await;
562
563        assert_eq!(msg_counter.load(Ordering::SeqCst), 10);
564        assert_eq!(conn_counter.load(Ordering::SeqCst), 1);
565
566        // Send another 10 events
567        let (_, mut events) = random_lines_with_stream(10, 10, None);
568        while let Some(event) = events.next().await {
569            sender.send(Some(event)).await.unwrap();
570        }
571
572        // Wait for server task to be complete.
573        sender.send(None).await.unwrap();
574        jh1.await.unwrap();
575        jh2.await.unwrap();
576
577        // Check that there are exactly 20 events.
578        assert_eq!(msg_counter.load(Ordering::SeqCst), 20);
579        assert_eq!(conn_counter.load(Ordering::SeqCst), 2);
580    }
581
582    /// Tests whether socket recovers from a hard disconnect.
583    #[tokio::test]
584    async fn reconnect() {
585        trace_init();
586
587        let addr = next_addr();
588        let config = SocketSinkConfig {
589            mode: Mode::Tcp(TcpMode {
590                config: TcpSinkConfig::from_address(addr.to_string()),
591                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
592            }),
593            acknowledgements: Default::default(),
594        };
595
596        let context = SinkContext::default();
597        let (sink, _healthcheck) = config.build(context).await.unwrap();
598
599        let (_, events) = random_lines_with_stream(1000, 10000, None);
600        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(sink, events, &SINK_TAGS));
601
602        // First listener
603        let mut count = 20usize;
604        TcpListenerStream::new(TcpListener::bind(addr).await.unwrap())
605            .next()
606            .await
607            .unwrap()
608            .map(|socket| FramedRead::new(socket, LinesCodec::new()))
609            .unwrap()
610            .map(|x| x.unwrap())
611            .take_while(|_| {
612                ready(if count > 0 {
613                    count -= 1;
614                    true
615                } else {
616                    false
617                })
618            })
619            .collect::<Vec<_>>()
620            .await;
621
622        // Disconnect
623        if cfg!(windows) {
624            // Gives Windows time to release the addr port.
625            sleep(Duration::from_secs(1)).await;
626        }
627
628        // Second listener
629        // If this doesn't succeed then the sink hanged.
630        assert!(
631            timeout(
632                Duration::from_secs(5),
633                CountReceiver::receive_lines(addr).connected()
634            )
635            .await
636            .is_ok()
637        );
638
639        sink_handle.await.unwrap();
640    }
641
642    #[cfg(unix)]
643    fn temp_uds_path(name: &str) -> PathBuf {
644        tempfile::tempdir().unwrap().keep().join(name)
645    }
646}