vector/sources/util/
unix_stream.rs

1use std::{fs::remove_file, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, StreamExt};
5use smallvec::SmallVec;
6use tokio::{
7    io::AsyncWriteExt,
8    net::{UnixListener, UnixStream},
9    time::sleep,
10};
11use tokio_stream::wrappers::UnixListenerStream;
12use tokio_util::codec::FramedRead;
13use tracing::{Instrument, field};
14use vector_lib::{
15    EstimatedJsonEncodedSizeOf,
16    codecs::StreamDecodingError,
17    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
18};
19
20use super::AfterReadExt;
21use crate::{
22    SourceSender,
23    async_read::VecAsyncReadExt,
24    event::Event,
25    internal_events::{
26        ConnectionOpen, OpenGauge, SocketEventsReceived, SocketMode, StreamClosedError,
27        UnixSocketError, UnixSocketFileDeleteError,
28    },
29    shutdown::ShutdownSignal,
30    sources::{
31        Source,
32        util::{change_socket_permissions, unix::UNNAMED_SOCKET_HOST},
33    },
34};
35
36/// Returns a `Source` object corresponding to a Unix domain stream socket.
37/// Passing in different functions for `decoder` and `handle_events` can allow
38/// for different source-specific logic (such as decoding syslog messages in the
39/// syslog source).
40pub fn build_unix_stream_source<D, F, E>(
41    listen_path: PathBuf,
42    socket_file_mode: Option<u32>,
43    decoder: D,
44    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
45    shutdown: ShutdownSignal,
46    out: SourceSender,
47) -> crate::Result<Source>
48where
49    D: tokio_util::codec::Decoder<Item = (F, usize), Error = E> + Clone + Send + 'static,
50    E: StreamDecodingError + std::fmt::Display + Send,
51    F: Into<SmallVec<[Event; 1]>> + Send,
52{
53    Ok(Box::pin(async move {
54        let listener = UnixListener::bind(&listen_path).unwrap_or_else(|e| {
55            panic!(
56                "Failed to bind to listener socket at path: {}. Err: {}",
57                listen_path.to_string_lossy(),
58                e
59            )
60        });
61        info!(message = "Listening.", path = ?listen_path, r#type = "unix");
62
63        change_socket_permissions(&listen_path, socket_file_mode)
64            .expect("Failed to set socket permissions");
65
66        let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
67
68        let connection_open = OpenGauge::new();
69        let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
70        tokio::pin!(stream);
71        while let Some(socket) = stream.next().await {
72            let socket = match socket {
73                Err(error) => {
74                    error!(message = "Failed to accept socket.", %error);
75                    continue;
76                }
77                Ok(socket) => socket,
78            };
79
80            let listen_path = listen_path.clone();
81
82            let span = info_span!("connection");
83
84            let received_from: Bytes = socket
85                .peer_addr()
86                .ok()
87                .and_then(|addr| {
88                    addr.as_pathname().map(|e| e.to_owned()).map({
89                        |path| {
90                            span.record("peer_path", field::debug(&path));
91                            path.to_string_lossy().into_owned().into()
92                        }
93                    })
94                })
95                // In most cases, we'll be connecting to this socket from
96                // an unnamed socket (a socket not bound to a
97                // file). Instead of a filename, we'll surface a specific
98                // host value.
99                .unwrap_or_else(|| UNNAMED_SOCKET_HOST.into());
100
101            let handle_events = handle_events.clone();
102
103            let bytes_received = bytes_received.clone();
104            let stream = socket
105                .after_read(move |byte_size| {
106                    bytes_received.emit(ByteSize(byte_size));
107                })
108                .allow_read_until(shutdown.clone().map(|_| ()));
109            let mut stream = FramedRead::new(stream, decoder.clone());
110
111            let connection_open = connection_open.clone();
112            let mut out = out.clone();
113            tokio::spawn(
114                async move {
115                    let _open_token = connection_open.open(|count| emit!(ConnectionOpen { count }));
116
117                    while let Some(result) = stream.next().await {
118                        match result {
119                            Ok((frame, _byte_size)) => {
120                                let mut events = frame.into();
121
122                                emit!(SocketEventsReceived {
123                                    mode: SocketMode::Unix,
124                                    byte_size: events.estimated_json_encoded_size_of(),
125                                    count: events.len(),
126                                });
127
128                                handle_events(&mut events, Some(received_from.clone()));
129
130                                let count = events.len();
131                                if (out.send_batch(events).await).is_err() {
132                                    emit!(StreamClosedError { count });
133                                }
134                            }
135                            Err(error) => {
136                                emit!(UnixSocketError {
137                                    error: &error,
138                                    path: &listen_path
139                                });
140
141                                if !error.can_continue() {
142                                    break;
143                                }
144                            }
145                        }
146                    }
147
148                    info!("Finished sending.");
149
150                    let socket: &mut UnixStream = stream.get_mut().get_mut().get_mut_ref();
151                    if let Err(error) = socket.shutdown().await {
152                        error!(message = "Failed shutting down socket.", %error);
153                    }
154                }
155                .instrument(span.or_current()),
156            );
157        }
158
159        // Wait for open connections to finish
160        while connection_open.any_open() {
161            sleep(Duration::from_millis(10)).await;
162        }
163
164        // Delete socket file
165        if let Err(error) = remove_file(&listen_path) {
166            emit!(UnixSocketFileDeleteError {
167                path: &listen_path,
168                error
169            });
170        }
171
172        Ok(())
173    }))
174}