vector/sources/util/
unix_stream.rs1use std::{fs::remove_file, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, StreamExt};
5use smallvec::SmallVec;
6use tokio::{
7 io::AsyncWriteExt,
8 net::{UnixListener, UnixStream},
9 time::sleep,
10};
11use tokio_stream::wrappers::UnixListenerStream;
12use tokio_util::codec::FramedRead;
13use tracing::{Instrument, field};
14use vector_lib::{
15 EstimatedJsonEncodedSizeOf,
16 codecs::StreamDecodingError,
17 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
18};
19
20use super::AfterReadExt;
21use crate::{
22 SourceSender,
23 async_read::VecAsyncReadExt,
24 event::Event,
25 internal_events::{
26 ConnectionOpen, OpenGauge, SocketEventsReceived, SocketMode, StreamClosedError,
27 UnixSocketError, UnixSocketFileDeleteError,
28 },
29 shutdown::ShutdownSignal,
30 sources::{
31 Source,
32 util::{change_socket_permissions, unix::UNNAMED_SOCKET_HOST},
33 },
34};
35
36pub fn build_unix_stream_source<D, F, E>(
41 listen_path: PathBuf,
42 socket_file_mode: Option<u32>,
43 decoder: D,
44 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
45 shutdown: ShutdownSignal,
46 out: SourceSender,
47) -> crate::Result<Source>
48where
49 D: tokio_util::codec::Decoder<Item = (F, usize), Error = E> + Clone + Send + 'static,
50 E: StreamDecodingError + std::fmt::Display + Send,
51 F: Into<SmallVec<[Event; 1]>> + Send,
52{
53 Ok(Box::pin(async move {
54 let listener = UnixListener::bind(&listen_path).unwrap_or_else(|e| {
55 panic!(
56 "Failed to bind to listener socket at path: {}. Err: {}",
57 listen_path.to_string_lossy(),
58 e
59 )
60 });
61 info!(message = "Listening.", path = ?listen_path, r#type = "unix");
62
63 change_socket_permissions(&listen_path, socket_file_mode)
64 .expect("Failed to set socket permissions");
65
66 let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
67
68 let connection_open = OpenGauge::new();
69 let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
70 tokio::pin!(stream);
71 while let Some(socket) = stream.next().await {
72 let socket = match socket {
73 Err(error) => {
74 error!(message = "Failed to accept socket.", %error);
75 continue;
76 }
77 Ok(socket) => socket,
78 };
79
80 let listen_path = listen_path.clone();
81
82 let span = info_span!("connection");
83
84 let received_from: Bytes = socket
85 .peer_addr()
86 .ok()
87 .and_then(|addr| {
88 addr.as_pathname().map(|e| e.to_owned()).map({
89 |path| {
90 span.record("peer_path", field::debug(&path));
91 path.to_string_lossy().into_owned().into()
92 }
93 })
94 })
95 .unwrap_or_else(|| UNNAMED_SOCKET_HOST.into());
100
101 let handle_events = handle_events.clone();
102
103 let bytes_received = bytes_received.clone();
104 let stream = socket
105 .after_read(move |byte_size| {
106 bytes_received.emit(ByteSize(byte_size));
107 })
108 .allow_read_until(shutdown.clone().map(|_| ()));
109 let mut stream = FramedRead::new(stream, decoder.clone());
110
111 let connection_open = connection_open.clone();
112 let mut out = out.clone();
113 tokio::spawn(
114 async move {
115 let _open_token = connection_open.open(|count| emit!(ConnectionOpen { count }));
116
117 while let Some(result) = stream.next().await {
118 match result {
119 Ok((frame, _byte_size)) => {
120 let mut events = frame.into();
121
122 emit!(SocketEventsReceived {
123 mode: SocketMode::Unix,
124 byte_size: events.estimated_json_encoded_size_of(),
125 count: events.len(),
126 });
127
128 handle_events(&mut events, Some(received_from.clone()));
129
130 let count = events.len();
131 if (out.send_batch(events).await).is_err() {
132 emit!(StreamClosedError { count });
133 }
134 }
135 Err(error) => {
136 emit!(UnixSocketError {
137 error: &error,
138 path: &listen_path
139 });
140
141 if !error.can_continue() {
142 break;
143 }
144 }
145 }
146 }
147
148 info!("Finished sending.");
149
150 let socket: &mut UnixStream = stream.get_mut().get_mut().get_mut_ref();
151 if let Err(error) = socket.shutdown().await {
152 error!(message = "Failed shutting down socket.", %error);
153 }
154 }
155 .instrument(span.or_current()),
156 );
157 }
158
159 while connection_open.any_open() {
161 sleep(Duration::from_millis(10)).await;
162 }
163
164 if let Err(error) = remove_file(&listen_path) {
166 emit!(UnixSocketFileDeleteError {
167 path: &listen_path,
168 error
169 });
170 }
171
172 Ok(())
173 }))
174}