vector/sinks/
socket.rs

1use vector_lib::codecs::{
2    encoding::{Framer, FramingConfig},
3    TextSerializerConfig,
4};
5use vector_lib::configurable::configurable_component;
6
7#[cfg(not(windows))]
8use crate::sinks::util::unix::UnixSinkConfig;
9use crate::{
10    codecs::{Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType},
11    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
12    sinks::util::{tcp::TcpSinkConfig, udp::UdpSinkConfig},
13};
14
15/// Configuration for the `socket` sink.
16#[configurable_component(sink("socket", "Deliver logs to a remote socket endpoint."))]
17#[derive(Clone, Debug)]
18pub struct SocketSinkConfig {
19    #[serde(flatten)]
20    pub mode: Mode,
21
22    #[configurable(derived)]
23    #[serde(
24        default,
25        deserialize_with = "crate::serde::bool_or_struct",
26        skip_serializing_if = "crate::serde::is_default"
27    )]
28    pub acknowledgements: AcknowledgementsConfig,
29}
30
31/// Socket mode.
32#[configurable_component]
33#[derive(Clone, Debug)]
34#[serde(tag = "mode", rename_all = "snake_case")]
35#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
36pub enum Mode {
37    /// Send over TCP.
38    Tcp(TcpMode),
39
40    /// Send over UDP.
41    Udp(UdpMode),
42
43    /// Send over a Unix domain socket (UDS), in stream mode.
44    #[serde(alias = "unix")]
45    UnixStream(UnixMode),
46
47    /// Send over a Unix domain socket (UDS), in datagram mode.
48    /// Unavailable on macOS, due to send(2)'s apparent non-blocking behavior,
49    /// resulting in ENOBUFS errors which we currently don't handle.
50    UnixDatagram(UnixMode),
51}
52
53/// TCP configuration.
54#[configurable_component]
55#[derive(Clone, Debug)]
56pub struct TcpMode {
57    #[serde(flatten)]
58    config: TcpSinkConfig,
59
60    #[serde(flatten)]
61    encoding: EncodingConfigWithFraming,
62}
63
64/// UDP configuration.
65#[configurable_component]
66#[derive(Clone, Debug)]
67pub struct UdpMode {
68    #[serde(flatten)]
69    config: UdpSinkConfig,
70
71    #[configurable(derived)]
72    encoding: EncodingConfig,
73}
74
75/// Unix Domain Socket configuration.
76#[configurable_component]
77#[derive(Clone, Debug)]
78pub struct UnixMode {
79    #[serde(flatten)]
80    config: UnixSinkConfig,
81
82    #[serde(flatten)]
83    encoding: EncodingConfigWithFraming,
84}
85
86// Workaround for https://github.com/vectordotdev/vector/issues/22198.
87#[cfg(windows)]
88/// A Unix Domain Socket sink.
89#[configurable_component]
90#[derive(Clone, Debug)]
91pub struct UnixSinkConfig {
92    /// The Unix socket path.
93    ///
94    /// This should be an absolute path.
95    #[configurable(metadata(docs::examples = "/path/to/socket"))]
96    pub path: std::path::PathBuf,
97}
98
99impl GenerateConfig for SocketSinkConfig {
100    fn generate_config() -> toml::Value {
101        toml::from_str(
102            r#"address = "92.12.333.224:5000"
103            mode = "tcp"
104            encoding.codec = "json""#,
105        )
106        .unwrap()
107    }
108}
109
110impl SocketSinkConfig {
111    pub const fn new(mode: Mode, acknowledgements: AcknowledgementsConfig) -> Self {
112        SocketSinkConfig {
113            mode,
114            acknowledgements,
115        }
116    }
117
118    pub fn make_basic_tcp_config(
119        address: String,
120        acknowledgements: AcknowledgementsConfig,
121    ) -> Self {
122        Self::new(
123            Mode::Tcp(TcpMode {
124                config: TcpSinkConfig::from_address(address),
125                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
126            }),
127            acknowledgements,
128        )
129    }
130}
131
132#[async_trait::async_trait]
133#[typetag::serde(name = "socket")]
134impl SinkConfig for SocketSinkConfig {
135    async fn build(
136        &self,
137        _cx: SinkContext,
138    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
139        match &self.mode {
140            Mode::Tcp(TcpMode { config, encoding }) => {
141                let transformer = encoding.transformer();
142                let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
143                let encoder = Encoder::<Framer>::new(framer, serializer);
144                config.build(transformer, encoder)
145            }
146            Mode::Udp(UdpMode { config, encoding }) => {
147                let transformer = encoding.transformer();
148                let serializer = encoding.build()?;
149                let encoder = Encoder::<()>::new(serializer);
150                config.build(transformer, encoder)
151            }
152            #[cfg(unix)]
153            Mode::UnixStream(UnixMode { config, encoding }) => {
154                let transformer = encoding.transformer();
155                let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
156                let encoder = Encoder::<Framer>::new(framer, serializer);
157                config.build(
158                    transformer,
159                    encoder,
160                    super::util::service::net::UnixMode::Stream,
161                )
162            }
163            #[allow(unused)]
164            #[cfg(unix)]
165            Mode::UnixDatagram(UnixMode { config, encoding }) => {
166                cfg_if! {
167                    if #[cfg(not(target_os = "macos"))] {
168                        let transformer = encoding.transformer();
169                        let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
170                        let encoder = Encoder::<Framer>::new(framer, serializer);
171                        config.build(
172                            transformer,
173                            encoder,
174                            super::util::service::net::UnixMode::Datagram,
175                        )
176                    }
177                    else {
178                        Err("UnixDatagram is not available on macOS platforms.".into())
179                    }
180                }
181            }
182            #[cfg(not(unix))]
183            Mode::UnixStream(_) | Mode::UnixDatagram(_) => {
184                Err("Unix modes are supported only on Unix platforms.".into())
185            }
186        }
187    }
188
189    fn input(&self) -> Input {
190        let encoder_input_type = match &self.mode {
191            Mode::Tcp(TcpMode { encoding, .. }) => encoding.config().1.input_type(),
192            Mode::Udp(UdpMode { encoding, .. }) => encoding.config().input_type(),
193            Mode::UnixStream(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
194            Mode::UnixDatagram(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
195        };
196        Input::new(encoder_input_type)
197    }
198
199    fn acknowledgements(&self) -> &AcknowledgementsConfig {
200        &self.acknowledgements
201    }
202}
203
204#[cfg(test)]
205mod test {
206    use std::{
207        future::ready,
208        net::{SocketAddr, UdpSocket},
209    };
210
211    use futures::stream::StreamExt;
212    use futures_util::stream;
213    use serde_json::Value;
214    use tokio::{
215        net::TcpListener,
216        time::{sleep, timeout, Duration},
217    };
218    use tokio_stream::wrappers::TcpListenerStream;
219    use tokio_util::codec::{FramedRead, LinesCodec};
220    use vector_lib::codecs::JsonSerializerConfig;
221
222    use super::*;
223
224    #[cfg(target_os = "windows")]
225    use cfg_if::cfg_if;
226    cfg_if! { if #[cfg(unix)] {
227        use vector_lib::codecs::NativeJsonSerializerConfig;
228        use crate::test_util::random_metrics_with_stream;
229        use std::path::PathBuf;
230    } }
231    #[cfg(all(unix, not(target_os = "macos")))]
232    use std::os::unix::net::UnixDatagram;
233
234    use crate::{
235        config::SinkContext,
236        event::{Event, LogEvent},
237        test_util::{
238            components::{assert_sink_compliance, run_and_assert_sink_compliance, SINK_TAGS},
239            next_addr, next_addr_v6, random_lines_with_stream, trace_init, CountReceiver,
240        },
241    };
242
243    #[test]
244    fn generate_config() {
245        crate::test_util::test_generate_config::<SocketSinkConfig>();
246    }
247
248    enum DatagramSocket {
249        Udp(UdpSocket),
250        #[cfg(all(unix, not(target_os = "macos")))]
251        Unix(UnixDatagram),
252    }
253
254    enum DatagramSocketAddr {
255        Udp(SocketAddr),
256        #[cfg(all(unix, not(target_os = "macos")))]
257        Unix(PathBuf),
258    }
259
260    async fn test_datagram(datagram_addr: DatagramSocketAddr) {
261        let receiver = match &datagram_addr {
262            DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
263            #[cfg(all(unix, not(target_os = "macos")))]
264            DatagramSocketAddr::Unix(path) => {
265                DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
266            }
267        };
268
269        let config = SocketSinkConfig {
270            mode: match &datagram_addr {
271                DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
272                    config: UdpSinkConfig::from_address(addr.to_string()),
273                    encoding: JsonSerializerConfig::default().into(),
274                }),
275                #[cfg(all(unix, not(target_os = "macos")))]
276                DatagramSocketAddr::Unix(path) => Mode::UnixDatagram(UnixMode {
277                    config: UnixSinkConfig::new(path.to_path_buf()),
278                    encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
279                }),
280            },
281            acknowledgements: Default::default(),
282        };
283
284        let context = SinkContext::default();
285        assert_sink_compliance(&SINK_TAGS, async move {
286            let (sink, _healthcheck) = config.build(context).await.unwrap();
287
288            let event = Event::Log(LogEvent::from("raw log line"));
289            sink.run(stream::once(ready(event.into()))).await
290        })
291        .await
292        .expect("Running sink failed");
293
294        let mut buf = [0; 256];
295        let size = match &receiver {
296            DatagramSocket::Udp(sock) => {
297                sock.recv_from(&mut buf).expect("Did not receive message").0
298            }
299            #[cfg(all(unix, not(target_os = "macos")))]
300            DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
301        };
302
303        let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
304        let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
305        let data = data.as_object().expect("Not a JSON object");
306        assert!(data.get("timestamp").is_some());
307        let message = data.get("message").expect("No message in JSON");
308        assert_eq!(message, &Value::String("raw log line".into()));
309    }
310
311    #[tokio::test]
312    async fn udp_ipv4() {
313        trace_init();
314
315        test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
316    }
317
318    #[tokio::test]
319    async fn udp_ipv6() {
320        trace_init();
321
322        test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
323    }
324
325    #[cfg(all(unix, not(target_os = "macos")))]
326    #[tokio::test]
327    async fn unix_datagram() {
328        trace_init();
329
330        test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
331            "unix_datagram_socket_test",
332        )))
333        .await;
334    }
335
336    #[tokio::test]
337    async fn tcp_stream() {
338        trace_init();
339
340        let addr = next_addr();
341        let config = SocketSinkConfig {
342            mode: Mode::Tcp(TcpMode {
343                config: TcpSinkConfig::from_address(addr.to_string()),
344                encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
345            }),
346            acknowledgements: Default::default(),
347        };
348
349        let mut receiver = CountReceiver::receive_lines(addr);
350
351        let (lines, events) = random_lines_with_stream(10, 100, None);
352
353        assert_sink_compliance(&SINK_TAGS, async move {
354            let context = SinkContext::default();
355            let (sink, _healthcheck) = config.build(context).await.unwrap();
356
357            sink.run(events).await
358        })
359        .await
360        .expect("Running sink failed");
361
362        // Wait for output to connect
363        receiver.connected().await;
364
365        let output = receiver.await;
366        assert_eq!(lines.len(), output.len());
367        for (source, received) in lines.iter().zip(output) {
368            let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
369            let received = json.get("message").unwrap().as_str().unwrap();
370            assert_eq!(source, received);
371        }
372    }
373
374    #[cfg(unix)]
375    #[tokio::test]
376    async fn metrics_socket() {
377        trace_init();
378
379        let out_path = temp_uds_path("unix_socket_test");
380        let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
381
382        let config = SocketSinkConfig {
383            mode: Mode::UnixStream(UnixMode {
384                config: UnixSinkConfig::new(out_path),
385                encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
386            }),
387            acknowledgements: Default::default(),
388        };
389
390        let (expected, events) = random_metrics_with_stream(10, None, None);
391
392        assert_sink_compliance(&SINK_TAGS, async move {
393            let context = SinkContext::default();
394            let (sink, _healthcheck) = config.build(context).await.unwrap();
395
396            sink.run(events).await
397        })
398        .await
399        .expect("Running sink failed");
400
401        // Wait for output to connect
402        receiver.connected().await;
403
404        let output = receiver.await;
405        assert_eq!(expected.len(), output.len());
406        for (source, received) in expected.iter().zip(output) {
407            let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
408            let received = json.get("metric").unwrap();
409            let received_name = received.get("name").unwrap().as_str().unwrap();
410            assert_eq!(source.as_metric().name(), received_name);
411        }
412    }
413
414    // This is a test that checks that we properly receive all events in the
415    // case of a proper server side write side shutdown.
416    //
417    // This test basically sends 10 events, shuts down the server and forces a
418    // reconnect. It then forces another 10 events through and we should get a
419    // total of 20 events.
420    //
421    // If this test hangs that means somewhere we are not collecting the correct
422    // events.
423    #[tokio::test]
424    async fn tcp_stream_detects_disconnect() {
425        use std::{
426            pin::Pin,
427            sync::{
428                atomic::{AtomicUsize, Ordering},
429                Arc,
430            },
431            task::Poll,
432        };
433
434        use futures::{channel::mpsc, FutureExt, SinkExt, StreamExt};
435        use tokio::{
436            io::{AsyncRead, AsyncWriteExt, ReadBuf},
437            net::TcpStream,
438            task::yield_now,
439            time::{interval, Duration},
440        };
441        use tokio_stream::wrappers::IntervalStream;
442
443        use crate::event::EventArray;
444        use crate::tls::{
445            self, MaybeTlsIncomingStream, MaybeTlsSettings, TlsConfig, TlsEnableableConfig,
446        };
447
448        trace_init();
449
450        let addr = next_addr();
451        let config = SocketSinkConfig {
452            mode: Mode::Tcp(TcpMode {
453                config: TcpSinkConfig::new(
454                    addr.to_string(),
455                    None,
456                    Some(TlsEnableableConfig {
457                        enabled: Some(true),
458                        options: TlsConfig {
459                            verify_certificate: Some(false),
460                            verify_hostname: Some(false),
461                            ca_file: Some(tls::TEST_PEM_CRT_PATH.into()),
462                            ..Default::default()
463                        },
464                    }),
465                    None,
466                ),
467                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
468            }),
469            acknowledgements: Default::default(),
470        };
471        let context = SinkContext::default();
472        let (sink, _healthcheck) = config.build(context).await.unwrap();
473        let (mut sender, receiver) = mpsc::channel::<Option<EventArray>>(0);
474        let jh1 = tokio::spawn(async move {
475            let stream = receiver
476                .take_while(|event| ready(event.is_some()))
477                .map(|event| event.unwrap())
478                .boxed();
479            run_and_assert_sink_compliance(sink, stream, &SINK_TAGS).await
480        });
481
482        let msg_counter = Arc::new(AtomicUsize::new(0));
483        let msg_counter1 = Arc::clone(&msg_counter);
484        let conn_counter = Arc::new(AtomicUsize::new(0));
485        let conn_counter1 = Arc::clone(&conn_counter);
486
487        let (close_tx, close_rx) = tokio::sync::oneshot::channel::<()>();
488        let mut close_rx = Some(close_rx.map(|x| x.unwrap()));
489
490        let config = Some(TlsEnableableConfig::test_config());
491
492        // Only accept two connections.
493        let jh2 = tokio::spawn(async move {
494            let tls = MaybeTlsSettings::from_config(config.as_ref(), true).unwrap();
495            let listener = tls.bind(&addr).await.unwrap();
496            listener
497                .accept_stream()
498                .take(2)
499                .for_each(|connection| {
500                    let mut close_rx = close_rx.take();
501
502                    conn_counter1.fetch_add(1, Ordering::SeqCst);
503                    let msg_counter1 = Arc::clone(&msg_counter1);
504
505                    let mut stream: MaybeTlsIncomingStream<TcpStream> = connection.unwrap();
506
507                    std::future::poll_fn(move |cx| loop {
508                        if let Some(fut) = close_rx.as_mut() {
509                            if let Poll::Ready(()) = fut.poll_unpin(cx) {
510                                stream
511                                    .get_mut()
512                                    .unwrap()
513                                    .shutdown()
514                                    .now_or_never()
515                                    .unwrap()
516                                    .unwrap();
517                                close_rx = None;
518                            }
519                        }
520
521                        let mut buf = [0u8; 11];
522                        let mut buf = ReadBuf::new(&mut buf);
523                        return match Pin::new(&mut stream).poll_read(cx, &mut buf) {
524                            Poll::Ready(Ok(())) => {
525                                if buf.filled().is_empty() {
526                                    Poll::Ready(())
527                                } else {
528                                    msg_counter1.fetch_add(1, Ordering::SeqCst);
529                                    continue;
530                                }
531                            }
532                            Poll::Ready(Err(error)) => panic!("{}", error),
533                            Poll::Pending => Poll::Pending,
534                        };
535                    })
536                })
537                .await;
538        });
539
540        let (_, mut events) = random_lines_with_stream(10, 10, None);
541        while let Some(event) = events.next().await {
542            sender.send(Some(event)).await.unwrap();
543        }
544
545        // Loop and check for 10 events, we should always get 10 events. Once,
546        // we have 10 events we can tell the server to shutdown to simulate the
547        // remote shutting down on an idle connection.
548        IntervalStream::new(interval(Duration::from_millis(100)))
549            .take(500)
550            .take_while(|_| ready(msg_counter.load(Ordering::SeqCst) != 10))
551            .for_each(|_| ready(()))
552            .await;
553        close_tx.send(()).unwrap();
554
555        // Close connection in spawned future
556        yield_now().await;
557
558        assert_eq!(msg_counter.load(Ordering::SeqCst), 10);
559        assert_eq!(conn_counter.load(Ordering::SeqCst), 1);
560
561        // Send another 10 events
562        let (_, mut events) = random_lines_with_stream(10, 10, None);
563        while let Some(event) = events.next().await {
564            sender.send(Some(event)).await.unwrap();
565        }
566
567        // Wait for server task to be complete.
568        sender.send(None).await.unwrap();
569        jh1.await.unwrap();
570        jh2.await.unwrap();
571
572        // Check that there are exactly 20 events.
573        assert_eq!(msg_counter.load(Ordering::SeqCst), 20);
574        assert_eq!(conn_counter.load(Ordering::SeqCst), 2);
575    }
576
577    /// Tests whether socket recovers from a hard disconnect.
578    #[tokio::test]
579    async fn reconnect() {
580        trace_init();
581
582        let addr = next_addr();
583        let config = SocketSinkConfig {
584            mode: Mode::Tcp(TcpMode {
585                config: TcpSinkConfig::from_address(addr.to_string()),
586                encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
587            }),
588            acknowledgements: Default::default(),
589        };
590
591        let context = SinkContext::default();
592        let (sink, _healthcheck) = config.build(context).await.unwrap();
593
594        let (_, events) = random_lines_with_stream(1000, 10000, None);
595        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(sink, events, &SINK_TAGS));
596
597        // First listener
598        let mut count = 20usize;
599        TcpListenerStream::new(TcpListener::bind(addr).await.unwrap())
600            .next()
601            .await
602            .unwrap()
603            .map(|socket| FramedRead::new(socket, LinesCodec::new()))
604            .unwrap()
605            .map(|x| x.unwrap())
606            .take_while(|_| {
607                ready(if count > 0 {
608                    count -= 1;
609                    true
610                } else {
611                    false
612                })
613            })
614            .collect::<Vec<_>>()
615            .await;
616
617        // Disconnect
618        if cfg!(windows) {
619            // Gives Windows time to release the addr port.
620            sleep(Duration::from_secs(1)).await;
621        }
622
623        // Second listener
624        // If this doesn't succeed then the sink hanged.
625        assert!(timeout(
626            Duration::from_secs(5),
627            CountReceiver::receive_lines(addr).connected()
628        )
629        .await
630        .is_ok());
631
632        sink_handle.await.unwrap();
633    }
634
635    #[cfg(unix)]
636    fn temp_uds_path(name: &str) -> PathBuf {
637        tempfile::tempdir().unwrap().keep().join(name)
638    }
639}