vector/sinks/util/
unix.rs

1use std::{
2    io,
3    os::fd::{AsFd, BorrowedFd},
4    path::PathBuf,
5    pin::Pin,
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use bytes::{Bytes, BytesMut};
11use futures::{stream::BoxStream, SinkExt, StreamExt};
12use snafu::{ResultExt, Snafu};
13use tokio::{
14    io::AsyncWriteExt,
15    net::{UnixDatagram, UnixStream},
16    time::sleep,
17};
18use tokio_util::codec::Encoder;
19use vector_lib::json_size::JsonSize;
20use vector_lib::{
21    configurable::configurable_component,
22    internal_event::{BytesSent, Protocol},
23};
24use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
25
26use crate::{
27    codecs::Transformer,
28    common::backoff::ExponentialBackoff,
29    event::{Event, Finalizable},
30    internal_events::{
31        ConnectionOpen, OpenGauge, SocketMode, UnixSocketConnectionEstablished,
32        UnixSocketOutgoingConnectionError, UnixSocketSendError,
33    },
34    sink_ext::VecSinkExt,
35    sinks::{
36        util::{
37            service::net::UnixMode,
38            socket_bytes_sink::{BytesSink, ShutdownCheck},
39            EncodedEvent, StreamSink,
40        },
41        Healthcheck, VectorSink,
42    },
43};
44
45use super::datagram::{send_datagrams, DatagramSocket};
46
47#[derive(Debug, Snafu)]
48pub enum UnixError {
49    #[snafu(display("Failed connecting to socket at path {}: {}", path.display(), source))]
50    ConnectionError {
51        source: tokio::io::Error,
52        path: PathBuf,
53    },
54
55    #[snafu(display("Failed to bind socket: {}.", source))]
56    FailedToBind { source: std::io::Error },
57}
58
59/// A Unix Domain Socket sink.
60#[configurable_component]
61#[derive(Clone, Debug)]
62pub struct UnixSinkConfig {
63    /// The Unix socket path.
64    ///
65    /// This should be an absolute path.
66    #[configurable(metadata(docs::examples = "/path/to/socket"))]
67    pub path: PathBuf,
68}
69
70impl UnixSinkConfig {
71    pub const fn new(path: PathBuf) -> Self {
72        Self { path }
73    }
74
75    pub fn build(
76        &self,
77        transformer: Transformer,
78        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
79            + Clone
80            + Send
81            + Sync
82            + 'static,
83        unix_mode: UnixMode,
84    ) -> crate::Result<(VectorSink, Healthcheck)> {
85        let connector = UnixConnector::new(self.path.clone(), unix_mode);
86        let sink = UnixSink::new(connector.clone(), transformer, encoder);
87        Ok((
88            VectorSink::from_event_streamsink(sink),
89            Box::pin(async move { connector.healthcheck().await }),
90        ))
91    }
92}
93
94pub enum UnixEither {
95    Datagram(UnixDatagram),
96    Stream(UnixStream),
97}
98
99impl UnixEither {
100    pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
101        match self {
102            Self::Datagram(datagram) => datagram.send(buf).await,
103            Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
104        }
105    }
106}
107
108impl AsFd for UnixEither {
109    fn as_fd(&self) -> BorrowedFd<'_> {
110        match self {
111            Self::Datagram(datagram) => datagram.as_fd(),
112            Self::Stream(stream) => stream.as_fd(),
113        }
114    }
115}
116
117#[derive(Debug, Clone)]
118struct UnixConnector {
119    pub path: PathBuf,
120    mode: UnixMode,
121}
122
123impl UnixConnector {
124    const fn new(path: PathBuf, mode: UnixMode) -> Self {
125        Self { path, mode }
126    }
127
128    const fn fresh_backoff() -> ExponentialBackoff {
129        // TODO: make configurable
130        ExponentialBackoff::from_millis(2)
131            .factor(250)
132            .max_delay(Duration::from_secs(60))
133    }
134
135    async fn connect(&self) -> Result<UnixEither, UnixError> {
136        match self.mode {
137            UnixMode::Stream => UnixStream::connect(&self.path)
138                .await
139                .context(ConnectionSnafu {
140                    path: self.path.clone(),
141                })
142                .map(UnixEither::Stream),
143            UnixMode::Datagram => {
144                UnixDatagram::unbound()
145                    .context(FailedToBindSnafu)
146                    .and_then(|datagram| {
147                        datagram
148                            .connect(&self.path)
149                            .context(ConnectionSnafu {
150                                path: self.path.clone(),
151                            })
152                            .map(|_| UnixEither::Datagram(datagram))
153                    })
154            }
155        }
156    }
157
158    async fn connect_backoff(&self) -> UnixEither {
159        let mut backoff = Self::fresh_backoff();
160        loop {
161            match self.connect().await {
162                Ok(stream) => {
163                    emit!(UnixSocketConnectionEstablished { path: &self.path });
164                    return stream;
165                }
166                Err(error) => {
167                    emit!(UnixSocketOutgoingConnectionError { error });
168                    sleep(backoff.next().unwrap()).await;
169                }
170            }
171        }
172    }
173
174    async fn healthcheck(&self) -> crate::Result<()> {
175        self.connect().await.map(|_| ()).map_err(Into::into)
176    }
177}
178
179struct UnixSink<E>
180where
181    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
182{
183    connector: UnixConnector,
184    transformer: Transformer,
185    encoder: E,
186}
187
188impl<E> UnixSink<E>
189where
190    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
191{
192    pub const fn new(connector: UnixConnector, transformer: Transformer, encoder: E) -> Self {
193        Self {
194            connector,
195            transformer,
196            encoder,
197        }
198    }
199
200    async fn connect(&mut self) -> BytesSink<UnixStream> {
201        let stream = match self.connector.connect_backoff().await {
202            UnixEither::Stream(stream) => stream,
203            UnixEither::Datagram(_) => unreachable!("connect is only called with Stream mode"),
204        };
205        BytesSink::new(stream, |_| ShutdownCheck::Alive, SocketMode::Unix)
206    }
207
208    async fn run_internal(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
209        match self.connector.mode {
210            UnixMode::Stream => self.run_stream(input).await,
211            UnixMode::Datagram => self.run_datagram(input).await,
212        }
213    }
214
215    // Same as TcpSink, more details there.
216    async fn run_stream(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
217        let mut encoder = self.encoder.clone();
218        let transformer = self.transformer.clone();
219        let mut input = input
220            .map(|mut event| {
221                let byte_size = event.size_of();
222                let json_byte_size = event.estimated_json_encoded_size_of();
223
224                transformer.transform(&mut event);
225
226                let finalizers = event.take_finalizers();
227                let mut bytes = BytesMut::new();
228
229                // Errors are handled by `Encoder`.
230                if encoder.encode(event, &mut bytes).is_ok() {
231                    let item = bytes.freeze();
232                    EncodedEvent {
233                        item,
234                        finalizers,
235                        byte_size,
236                        json_byte_size,
237                    }
238                } else {
239                    EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
240                }
241            })
242            .peekable();
243
244        while Pin::new(&mut input).peek().await.is_some() {
245            let mut sink = self.connect().await;
246            let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
247
248            let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
249                Ok(()) => sink.close().await,
250                Err(error) => Err(error),
251            };
252
253            if let Err(error) = result {
254                emit!(UnixSocketSendError {
255                    error: &error,
256                    path: &self.connector.path
257                });
258            }
259        }
260
261        Ok(())
262    }
263
264    async fn run_datagram(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
265        let bytes_sent = register!(BytesSent::from(Protocol::UNIX));
266        let mut input = input.peekable();
267
268        let mut encoder = self.encoder.clone();
269        while Pin::new(&mut input).peek().await.is_some() {
270            let socket = match self.connector.connect_backoff().await {
271                UnixEither::Datagram(datagram) => datagram,
272                UnixEither::Stream(_) => {
273                    unreachable!("run_datagram is only called with Datagram mode")
274                }
275            };
276
277            send_datagrams(
278                &mut input,
279                DatagramSocket::Unix(socket, self.connector.path.clone()),
280                &self.transformer,
281                &mut encoder,
282                &bytes_sent,
283            )
284            .await;
285        }
286
287        Ok(())
288    }
289}
290
291#[async_trait]
292impl<E> StreamSink<Event> for UnixSink<E>
293where
294    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
295{
296    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
297        self.run_internal(input).await
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use tokio::net::UnixListener;
304    use vector_lib::codecs::{
305        encoding::Framer, BytesEncoder, NewlineDelimitedEncoder, TextSerializerConfig,
306    };
307
308    use super::*;
309    use crate::{
310        codecs::Encoder,
311        test_util::{
312            components::{assert_sink_compliance, SINK_TAGS},
313            random_lines_with_stream, CountReceiver,
314        },
315    };
316
317    fn temp_uds_path(name: &str) -> PathBuf {
318        tempfile::tempdir().unwrap().keep().join(name)
319    }
320
321    #[tokio::test]
322    async fn unix_sink_healthcheck() {
323        let good_path = temp_uds_path("valid_stream_uds");
324        let _listener = UnixListener::bind(&good_path).unwrap();
325        assert!(UnixSinkConfig::new(good_path.clone())
326            .build(
327                Default::default(),
328                Encoder::<()>::new(TextSerializerConfig::default().build().into()),
329                UnixMode::Stream
330            )
331            .unwrap()
332            .1
333            .await
334            .is_ok());
335        assert!(
336            UnixSinkConfig::new(good_path.clone())
337                .build(
338                    Default::default(),
339                    Encoder::<()>::new(TextSerializerConfig::default().build().into()),
340                    UnixMode::Datagram
341                )
342                .unwrap()
343                .1
344                .await
345                .is_err(),
346            "datagram mode should fail when attempting to send into a stream mode UDS"
347        );
348
349        let bad_path = temp_uds_path("no_one_listening");
350        assert!(UnixSinkConfig::new(bad_path.clone())
351            .build(
352                Default::default(),
353                Encoder::<()>::new(TextSerializerConfig::default().build().into()),
354                UnixMode::Stream
355            )
356            .unwrap()
357            .1
358            .await
359            .is_err());
360        assert!(UnixSinkConfig::new(bad_path.clone())
361            .build(
362                Default::default(),
363                Encoder::<()>::new(TextSerializerConfig::default().build().into()),
364                UnixMode::Datagram
365            )
366            .unwrap()
367            .1
368            .await
369            .is_err());
370    }
371
372    #[tokio::test]
373    async fn basic_unix_sink() {
374        let num_lines = 1000;
375        let out_path = temp_uds_path("unix_test");
376
377        // Set up server to receive events from the Sink.
378        let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
379
380        // Set up Sink
381        let config = UnixSinkConfig::new(out_path);
382        let (sink, _healthcheck) = config
383            .build(
384                Default::default(),
385                Encoder::<Framer>::new(
386                    NewlineDelimitedEncoder::default().into(),
387                    TextSerializerConfig::default().build().into(),
388                ),
389                UnixMode::Stream,
390            )
391            .unwrap();
392
393        // Send the test data
394        let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
395
396        assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
397            .await
398            .expect("Running sink failed");
399
400        // Wait for output to connect
401        receiver.connected().await;
402
403        // Receive the data sent by the Sink to the receiver
404        assert_eq!(input_lines, receiver.await);
405    }
406
407    #[cfg_attr(target_os = "macos", ignore)]
408    #[tokio::test]
409    async fn basic_unix_datagram_sink() {
410        let num_lines = 1000;
411        let out_path = temp_uds_path("unix_datagram_test");
412
413        // Set up listener to receive events from the Sink.
414        let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap();
415        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
416
417        // Listen in the background to avoid blocking
418        let handle = tokio::task::spawn_blocking(move || {
419            let mut output_lines = Vec::<String>::with_capacity(num_lines);
420
421            ready_tx.send(()).expect("failed to signal readiness");
422            for _ in 0..num_lines {
423                let mut buf = [0; 101];
424                let (size, _) = receiver
425                    .recv_from(&mut buf)
426                    .expect("Did not receive message");
427                let line = String::from_utf8_lossy(&buf[..size]).to_string();
428                output_lines.push(line);
429            }
430
431            output_lines
432        });
433        ready_rx.await.expect("failed to receive ready signal");
434
435        // Set up Sink
436        let config = UnixSinkConfig::new(out_path.clone());
437        let (sink, _healthcheck) = config
438            .build(
439                Default::default(),
440                Encoder::<Framer>::new(
441                    BytesEncoder.into(),
442                    TextSerializerConfig::default().build().into(),
443                ),
444                UnixMode::Datagram,
445            )
446            .unwrap();
447
448        // Send the test data
449        let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
450
451        assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
452            .await
453            .expect("Running sink failed");
454
455        // Receive the data sent by the Sink to the receiver
456        let output_lines = handle.await.expect("UDS Datagram receiver failed");
457
458        assert_eq!(input_lines, output_lines);
459    }
460}