vector/sinks/util/
unix.rs

1use std::{
2    io,
3    os::fd::{AsFd, BorrowedFd},
4    path::PathBuf,
5    pin::Pin,
6};
7
8use async_trait::async_trait;
9use bytes::{Bytes, BytesMut};
10use futures::{SinkExt, StreamExt, stream::BoxStream};
11use snafu::{ResultExt, Snafu};
12use tokio::{
13    io::AsyncWriteExt,
14    net::{UnixDatagram, UnixStream},
15    time::sleep,
16};
17use tokio_util::codec::Encoder;
18use vector_lib::{
19    ByteSizeOf, EstimatedJsonEncodedSizeOf,
20    configurable::configurable_component,
21    internal_event::{BytesSent, Protocol},
22    json_size::JsonSize,
23};
24
25use super::datagram::{DatagramSocket, send_datagrams};
26use crate::{
27    codecs::Transformer,
28    common::backoff::ExponentialBackoff,
29    event::{Event, Finalizable},
30    internal_events::{
31        ConnectionOpen, OpenGauge, SocketMode, UnixSocketConnectionEstablished,
32        UnixSocketOutgoingConnectionError, UnixSocketSendError,
33    },
34    sink_ext::VecSinkExt,
35    sinks::{
36        Healthcheck, VectorSink,
37        util::{
38            EncodedEvent, StreamSink,
39            service::net::UnixMode,
40            socket_bytes_sink::{BytesSink, ShutdownCheck},
41        },
42    },
43};
44
45#[derive(Debug, Snafu)]
46pub enum UnixError {
47    #[snafu(display("Failed connecting to socket at path {}: {}", path.display(), source))]
48    ConnectionError {
49        source: tokio::io::Error,
50        path: PathBuf,
51    },
52
53    #[snafu(display("Failed to bind socket: {}.", source))]
54    FailedToBind { source: std::io::Error },
55}
56
57/// A Unix Domain Socket sink.
58#[configurable_component]
59#[derive(Clone, Debug)]
60pub struct UnixSinkConfig {
61    /// The Unix socket path.
62    ///
63    /// This should be an absolute path.
64    #[configurable(metadata(docs::examples = "/path/to/socket"))]
65    pub path: PathBuf,
66}
67
68impl UnixSinkConfig {
69    pub const fn new(path: PathBuf) -> Self {
70        Self { path }
71    }
72
73    pub fn build(
74        &self,
75        transformer: Transformer,
76        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
77        + Clone
78        + Send
79        + Sync
80        + 'static,
81        unix_mode: UnixMode,
82    ) -> crate::Result<(VectorSink, Healthcheck)> {
83        let connector = UnixConnector::new(self.path.clone(), unix_mode);
84        let sink = UnixSink::new(connector.clone(), transformer, encoder);
85        Ok((
86            VectorSink::from_event_streamsink(sink),
87            Box::pin(async move { connector.healthcheck().await }),
88        ))
89    }
90}
91
92pub enum UnixEither {
93    Datagram(UnixDatagram),
94    Stream(UnixStream),
95}
96
97impl UnixEither {
98    pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
99        match self {
100            Self::Datagram(datagram) => datagram.send(buf).await,
101            Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
102        }
103    }
104}
105
106impl AsFd for UnixEither {
107    fn as_fd(&self) -> BorrowedFd<'_> {
108        match self {
109            Self::Datagram(datagram) => datagram.as_fd(),
110            Self::Stream(stream) => stream.as_fd(),
111        }
112    }
113}
114
115#[derive(Debug, Clone)]
116struct UnixConnector {
117    pub path: PathBuf,
118    mode: UnixMode,
119}
120
121impl UnixConnector {
122    const fn new(path: PathBuf, mode: UnixMode) -> Self {
123        Self { path, mode }
124    }
125
126    fn fresh_backoff() -> ExponentialBackoff {
127        // TODO: make configurable
128        ExponentialBackoff::default()
129    }
130
131    async fn connect(&self) -> Result<UnixEither, UnixError> {
132        match self.mode {
133            UnixMode::Stream => UnixStream::connect(&self.path)
134                .await
135                .context(ConnectionSnafu {
136                    path: self.path.clone(),
137                })
138                .map(UnixEither::Stream),
139            UnixMode::Datagram => {
140                UnixDatagram::unbound()
141                    .context(FailedToBindSnafu)
142                    .and_then(|datagram| {
143                        datagram
144                            .connect(&self.path)
145                            .context(ConnectionSnafu {
146                                path: self.path.clone(),
147                            })
148                            .map(|_| UnixEither::Datagram(datagram))
149                    })
150            }
151        }
152    }
153
154    async fn connect_backoff(&self) -> UnixEither {
155        let mut backoff = Self::fresh_backoff();
156        loop {
157            match self.connect().await {
158                Ok(stream) => {
159                    emit!(UnixSocketConnectionEstablished { path: &self.path });
160                    return stream;
161                }
162                Err(error) => {
163                    emit!(UnixSocketOutgoingConnectionError { error });
164                    sleep(backoff.next().unwrap()).await;
165                }
166            }
167        }
168    }
169
170    async fn healthcheck(&self) -> crate::Result<()> {
171        self.connect().await.map(|_| ()).map_err(Into::into)
172    }
173}
174
175struct UnixSink<E>
176where
177    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
178{
179    connector: UnixConnector,
180    transformer: Transformer,
181    encoder: E,
182}
183
184impl<E> UnixSink<E>
185where
186    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
187{
188    pub const fn new(connector: UnixConnector, transformer: Transformer, encoder: E) -> Self {
189        Self {
190            connector,
191            transformer,
192            encoder,
193        }
194    }
195
196    async fn connect(&mut self) -> BytesSink<UnixStream> {
197        let stream = match self.connector.connect_backoff().await {
198            UnixEither::Stream(stream) => stream,
199            UnixEither::Datagram(_) => unreachable!("connect is only called with Stream mode"),
200        };
201        BytesSink::new(stream, |_| ShutdownCheck::Alive, SocketMode::Unix)
202    }
203
204    async fn run_internal(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
205        match self.connector.mode {
206            UnixMode::Stream => self.run_stream(input).await,
207            UnixMode::Datagram => self.run_datagram(input).await,
208        }
209    }
210
211    // Same as TcpSink, more details there.
212    async fn run_stream(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
213        let mut encoder = self.encoder.clone();
214        let transformer = self.transformer.clone();
215        let mut input = input
216            .map(|mut event| {
217                let byte_size = event.size_of();
218                let json_byte_size = event.estimated_json_encoded_size_of();
219
220                transformer.transform(&mut event);
221
222                let finalizers = event.take_finalizers();
223                let mut bytes = BytesMut::new();
224
225                // Errors are handled by `Encoder`.
226                if encoder.encode(event, &mut bytes).is_ok() {
227                    let item = bytes.freeze();
228                    EncodedEvent {
229                        item,
230                        finalizers,
231                        byte_size,
232                        json_byte_size,
233                    }
234                } else {
235                    EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
236                }
237            })
238            .peekable();
239
240        while Pin::new(&mut input).peek().await.is_some() {
241            let mut sink = self.connect().await;
242            let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
243
244            let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
245                Ok(()) => sink.close().await,
246                Err(error) => Err(error),
247            };
248
249            if let Err(error) = result {
250                emit!(UnixSocketSendError {
251                    error: &error,
252                    path: &self.connector.path
253                });
254            }
255        }
256
257        Ok(())
258    }
259
260    async fn run_datagram(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
261        let bytes_sent = register!(BytesSent::from(Protocol::UNIX));
262        let mut input = input.peekable();
263
264        let mut encoder = self.encoder.clone();
265        while Pin::new(&mut input).peek().await.is_some() {
266            let socket = match self.connector.connect_backoff().await {
267                UnixEither::Datagram(datagram) => datagram,
268                UnixEither::Stream(_) => {
269                    unreachable!("run_datagram is only called with Datagram mode")
270                }
271            };
272
273            send_datagrams(
274                &mut input,
275                DatagramSocket::Unix(socket, self.connector.path.clone()),
276                &self.transformer,
277                &mut encoder,
278                &None,
279                &bytes_sent,
280            )
281            .await;
282        }
283
284        Ok(())
285    }
286}
287
288#[async_trait]
289impl<E> StreamSink<Event> for UnixSink<E>
290where
291    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
292{
293    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
294        self.run_internal(input).await
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use tokio::net::UnixListener;
301    use vector_lib::codecs::{
302        BytesEncoder, NewlineDelimitedEncoder, TextSerializerConfig, encoding::Framer,
303    };
304
305    use super::*;
306    use crate::{
307        codecs::Encoder,
308        test_util::{
309            CountReceiver,
310            components::{SINK_TAGS, assert_sink_compliance},
311            random_lines_with_stream,
312        },
313    };
314
315    fn temp_uds_path(name: &str) -> PathBuf {
316        tempfile::tempdir().unwrap().keep().join(name)
317    }
318
319    #[tokio::test]
320    async fn unix_sink_healthcheck() {
321        let good_path = temp_uds_path("valid_stream_uds");
322        let _listener = UnixListener::bind(&good_path).unwrap();
323        assert!(
324            UnixSinkConfig::new(good_path.clone())
325                .build(
326                    Default::default(),
327                    Encoder::<()>::new(TextSerializerConfig::default().build().into()),
328                    UnixMode::Stream
329                )
330                .unwrap()
331                .1
332                .await
333                .is_ok()
334        );
335        assert!(
336            UnixSinkConfig::new(good_path.clone())
337                .build(
338                    Default::default(),
339                    Encoder::<()>::new(TextSerializerConfig::default().build().into()),
340                    UnixMode::Datagram
341                )
342                .unwrap()
343                .1
344                .await
345                .is_err(),
346            "datagram mode should fail when attempting to send into a stream mode UDS"
347        );
348
349        let bad_path = temp_uds_path("no_one_listening");
350        assert!(
351            UnixSinkConfig::new(bad_path.clone())
352                .build(
353                    Default::default(),
354                    Encoder::<()>::new(TextSerializerConfig::default().build().into()),
355                    UnixMode::Stream
356                )
357                .unwrap()
358                .1
359                .await
360                .is_err()
361        );
362        assert!(
363            UnixSinkConfig::new(bad_path.clone())
364                .build(
365                    Default::default(),
366                    Encoder::<()>::new(TextSerializerConfig::default().build().into()),
367                    UnixMode::Datagram
368                )
369                .unwrap()
370                .1
371                .await
372                .is_err()
373        );
374    }
375
376    #[tokio::test]
377    async fn basic_unix_sink() {
378        let num_lines = 1000;
379        let out_path = temp_uds_path("unix_test");
380
381        // Set up server to receive events from the Sink.
382        let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
383
384        // Set up Sink
385        let config = UnixSinkConfig::new(out_path);
386        let (sink, _healthcheck) = config
387            .build(
388                Default::default(),
389                Encoder::<Framer>::new(
390                    NewlineDelimitedEncoder::default().into(),
391                    TextSerializerConfig::default().build().into(),
392                ),
393                UnixMode::Stream,
394            )
395            .unwrap();
396
397        // Send the test data
398        let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
399
400        assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
401            .await
402            .expect("Running sink failed");
403
404        // Wait for output to connect
405        receiver.connected().await;
406
407        // Receive the data sent by the Sink to the receiver
408        assert_eq!(input_lines, receiver.await);
409    }
410
411    #[cfg_attr(target_os = "macos", ignore)]
412    #[tokio::test]
413    async fn basic_unix_datagram_sink() {
414        let num_lines = 1000;
415        let out_path = temp_uds_path("unix_datagram_test");
416
417        // Set up listener to receive events from the Sink.
418        let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap();
419        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
420
421        // Listen in the background to avoid blocking
422        let handle = tokio::task::spawn_blocking(move || {
423            let mut output_lines = Vec::<String>::with_capacity(num_lines);
424
425            ready_tx.send(()).expect("failed to signal readiness");
426            for _ in 0..num_lines {
427                let mut buf = [0; 101];
428                let (size, _) = receiver
429                    .recv_from(&mut buf)
430                    .expect("Did not receive message");
431                let line = String::from_utf8_lossy(&buf[..size]).to_string();
432                output_lines.push(line);
433            }
434
435            output_lines
436        });
437        ready_rx.await.expect("failed to receive ready signal");
438
439        // Set up Sink
440        let config = UnixSinkConfig::new(out_path.clone());
441        let (sink, _healthcheck) = config
442            .build(
443                Default::default(),
444                Encoder::<Framer>::new(
445                    BytesEncoder.into(),
446                    TextSerializerConfig::default().build().into(),
447                ),
448                UnixMode::Datagram,
449            )
450            .unwrap();
451
452        // Send the test data
453        let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
454
455        assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
456            .await
457            .expect("Running sink failed");
458
459        // Receive the data sent by the Sink to the receiver
460        let output_lines = handle.await.expect("UDS Datagram receiver failed");
461
462        assert_eq!(input_lines, output_lines);
463    }
464}