vector/sources/util/
unix_stream.rs

1use std::{fs::remove_file, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, StreamExt};
5use smallvec::SmallVec;
6use tokio::{
7    io::AsyncWriteExt,
8    net::{UnixListener, UnixStream},
9    time::sleep,
10};
11use tokio_stream::wrappers::UnixListenerStream;
12use tracing::{Instrument, field};
13use vector_lib::{
14    EstimatedJsonEncodedSizeOf,
15    codecs::{DecoderFramedRead, StreamDecodingError},
16    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
17};
18
19use super::AfterReadExt;
20use crate::{
21    SourceSender,
22    async_read::VecAsyncReadExt,
23    event::Event,
24    internal_events::{
25        ConnectionOpen, OpenGauge, SocketEventsReceived, SocketMode, StreamClosedError,
26        UnixSocketError, UnixSocketFileDeleteError,
27    },
28    shutdown::ShutdownSignal,
29    sources::{
30        Source,
31        util::{change_socket_permissions, unix::UNNAMED_SOCKET_HOST},
32    },
33};
34
35/// Returns a `Source` object corresponding to a Unix domain stream socket.
36/// Passing in different functions for `decoder` and `handle_events` can allow
37/// for different source-specific logic (such as decoding syslog messages in the
38/// syslog source).
39pub fn build_unix_stream_source<D, F, E>(
40    listen_path: PathBuf,
41    socket_file_mode: Option<u32>,
42    decoder: D,
43    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
44    shutdown: ShutdownSignal,
45    out: SourceSender,
46) -> crate::Result<Source>
47where
48    D: tokio_util::codec::Decoder<Item = (F, usize), Error = E> + Clone + Send + 'static,
49    E: StreamDecodingError + std::fmt::Display + Send + From<std::io::Error>,
50    F: Into<SmallVec<[Event; 1]>> + Send,
51{
52    Ok(Box::pin(async move {
53        let listener = UnixListener::bind(&listen_path).unwrap_or_else(|e| {
54            panic!(
55                "Failed to bind to listener socket at path: {}. Err: {}",
56                listen_path.to_string_lossy(),
57                e
58            )
59        });
60        info!(message = "Listening.", path = ?listen_path, r#type = "unix");
61
62        change_socket_permissions(&listen_path, socket_file_mode)
63            .expect("Failed to set socket permissions");
64
65        let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
66
67        let connection_open = OpenGauge::new();
68        let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
69        tokio::pin!(stream);
70        while let Some(socket) = stream.next().await {
71            let socket = match socket {
72                Err(error) => {
73                    error!(message = "Failed to accept socket.", %error);
74                    continue;
75                }
76                Ok(socket) => socket,
77            };
78
79            let listen_path = listen_path.clone();
80
81            let span = info_span!("connection");
82
83            let received_from: Bytes = socket
84                .peer_addr()
85                .ok()
86                .and_then(|addr| {
87                    addr.as_pathname().map(|e| e.to_owned()).map({
88                        |path| {
89                            span.record("peer_path", field::debug(&path));
90                            path.to_string_lossy().into_owned().into()
91                        }
92                    })
93                })
94                // In most cases, we'll be connecting to this socket from
95                // an unnamed socket (a socket not bound to a
96                // file). Instead of a filename, we'll surface a specific
97                // host value.
98                .unwrap_or_else(|| UNNAMED_SOCKET_HOST.into());
99
100            let handle_events = handle_events.clone();
101
102            let bytes_received = bytes_received.clone();
103            let stream = socket
104                .after_read(move |byte_size| {
105                    bytes_received.emit(ByteSize(byte_size));
106                })
107                .allow_read_until(shutdown.clone().map(|_| ()));
108            let mut stream = DecoderFramedRead::new(stream, decoder.clone());
109
110            let connection_open = connection_open.clone();
111            let mut out = out.clone();
112            tokio::spawn(
113                async move {
114                    let _open_token = connection_open.open(|count| emit!(ConnectionOpen { count }));
115
116                    while let Some(result) = stream.next().await {
117                        match result {
118                            Ok((frame, _byte_size)) => {
119                                let mut events = frame.into();
120
121                                emit!(SocketEventsReceived {
122                                    mode: SocketMode::Unix,
123                                    byte_size: events.estimated_json_encoded_size_of(),
124                                    count: events.len(),
125                                });
126
127                                handle_events(&mut events, Some(received_from.clone()));
128
129                                let count = events.len();
130                                if (out.send_batch(events).await).is_err() {
131                                    emit!(StreamClosedError { count });
132                                }
133                            }
134                            Err(error) => {
135                                emit!(UnixSocketError {
136                                    error: &error,
137                                    path: &listen_path
138                                });
139
140                                if !error.can_continue() {
141                                    break;
142                                }
143                            }
144                        }
145                    }
146
147                    info!("Finished sending.");
148
149                    let socket: &mut UnixStream = stream.get_mut().get_mut().get_mut_ref();
150                    if let Err(error) = socket.shutdown().await {
151                        error!(message = "Failed shutting down socket.", %error);
152                    }
153                }
154                .instrument(span.or_current()),
155            );
156        }
157
158        // Wait for open connections to finish
159        while connection_open.any_open() {
160            sleep(Duration::from_millis(10)).await;
161        }
162
163        // Delete socket file
164        if let Err(error) = remove_file(&listen_path) {
165            emit!(UnixSocketFileDeleteError {
166                path: &listen_path,
167                error
168            });
169        }
170
171        Ok(())
172    }))
173}