vector/sources/util/
unix_stream.rs

1use std::{fs::remove_file, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, StreamExt};
5use smallvec::SmallVec;
6use tokio::{
7    io::AsyncWriteExt,
8    net::{UnixListener, UnixStream},
9    time::sleep,
10};
11use tokio_stream::wrappers::UnixListenerStream;
12use tokio_util::codec::FramedRead;
13use tracing::{field, Instrument};
14use vector_lib::codecs::StreamDecodingError;
15use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
16use vector_lib::EstimatedJsonEncodedSizeOf;
17
18use super::AfterReadExt;
19use crate::{
20    async_read::VecAsyncReadExt,
21    event::Event,
22    internal_events::{
23        ConnectionOpen, OpenGauge, SocketEventsReceived, SocketMode, StreamClosedError,
24        UnixSocketError, UnixSocketFileDeleteError,
25    },
26    shutdown::ShutdownSignal,
27    sources::util::change_socket_permissions,
28    sources::util::unix::UNNAMED_SOCKET_HOST,
29    sources::Source,
30    SourceSender,
31};
32
33/// Returns a `Source` object corresponding to a Unix domain stream socket.
34/// Passing in different functions for `decoder` and `handle_events` can allow
35/// for different source-specific logic (such as decoding syslog messages in the
36/// syslog source).
37pub fn build_unix_stream_source<D, F, E>(
38    listen_path: PathBuf,
39    socket_file_mode: Option<u32>,
40    decoder: D,
41    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
42    shutdown: ShutdownSignal,
43    out: SourceSender,
44) -> crate::Result<Source>
45where
46    D: tokio_util::codec::Decoder<Item = (F, usize), Error = E> + Clone + Send + 'static,
47    E: StreamDecodingError + std::fmt::Display + Send,
48    F: Into<SmallVec<[Event; 1]>> + Send,
49{
50    Ok(Box::pin(async move {
51        let listener = UnixListener::bind(&listen_path).unwrap_or_else(|e| {
52            panic!(
53                "Failed to bind to listener socket at path: {}. Err: {}",
54                listen_path.to_string_lossy(),
55                e
56            )
57        });
58        info!(message = "Listening.", path = ?listen_path, r#type = "unix");
59
60        change_socket_permissions(&listen_path, socket_file_mode)
61            .expect("Failed to set socket permissions");
62
63        let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
64
65        let connection_open = OpenGauge::new();
66        let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
67        tokio::pin!(stream);
68        while let Some(socket) = stream.next().await {
69            let socket = match socket {
70                Err(error) => {
71                    error!(message = "Failed to accept socket.", %error);
72                    continue;
73                }
74                Ok(socket) => socket,
75            };
76
77            let listen_path = listen_path.clone();
78
79            let span = info_span!("connection");
80
81            let received_from: Bytes = socket
82                .peer_addr()
83                .ok()
84                .and_then(|addr| {
85                    addr.as_pathname().map(|e| e.to_owned()).map({
86                        |path| {
87                            span.record("peer_path", field::debug(&path));
88                            path.to_string_lossy().into_owned().into()
89                        }
90                    })
91                })
92                // In most cases, we'll be connecting to this socket from
93                // an unnamed socket (a socket not bound to a
94                // file). Instead of a filename, we'll surface a specific
95                // host value.
96                .unwrap_or_else(|| UNNAMED_SOCKET_HOST.into());
97
98            let handle_events = handle_events.clone();
99
100            let bytes_received = bytes_received.clone();
101            let stream = socket
102                .after_read(move |byte_size| {
103                    bytes_received.emit(ByteSize(byte_size));
104                })
105                .allow_read_until(shutdown.clone().map(|_| ()));
106            let mut stream = FramedRead::new(stream, decoder.clone());
107
108            let connection_open = connection_open.clone();
109            let mut out = out.clone();
110            tokio::spawn(
111                async move {
112                    let _open_token = connection_open.open(|count| emit!(ConnectionOpen { count }));
113
114                    while let Some(result) = stream.next().await {
115                        match result {
116                            Ok((frame, _byte_size)) => {
117                                let mut events = frame.into();
118
119                                emit!(SocketEventsReceived {
120                                    mode: SocketMode::Unix,
121                                    byte_size: events.estimated_json_encoded_size_of(),
122                                    count: events.len(),
123                                });
124
125                                handle_events(&mut events, Some(received_from.clone()));
126
127                                let count = events.len();
128                                if (out.send_batch(events).await).is_err() {
129                                    emit!(StreamClosedError { count });
130                                }
131                            }
132                            Err(error) => {
133                                emit!(UnixSocketError {
134                                    error: &error,
135                                    path: &listen_path
136                                });
137
138                                if !error.can_continue() {
139                                    break;
140                                }
141                            }
142                        }
143                    }
144
145                    info!("Finished sending.");
146
147                    let socket: &mut UnixStream = stream.get_mut().get_mut().get_mut_ref();
148                    if let Err(error) = socket.shutdown().await {
149                        error!(message = "Failed shutting down socket.", %error);
150                    }
151                }
152                .instrument(span.or_current()),
153            );
154        }
155
156        // Wait for open connections to finish
157        while connection_open.any_open() {
158            sleep(Duration::from_millis(10)).await;
159        }
160
161        // Delete socket file
162        if let Err(error) = remove_file(&listen_path) {
163            emit!(UnixSocketFileDeleteError {
164                path: &listen_path,
165                error
166            });
167        }
168
169        Ok(())
170    }))
171}