vector/sources/util/
unix_stream.rs1use std::{fs::remove_file, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, StreamExt};
5use smallvec::SmallVec;
6use tokio::{
7 io::AsyncWriteExt,
8 net::{UnixListener, UnixStream},
9 time::sleep,
10};
11use tokio_stream::wrappers::UnixListenerStream;
12use tokio_util::codec::FramedRead;
13use tracing::{field, Instrument};
14use vector_lib::codecs::StreamDecodingError;
15use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
16use vector_lib::EstimatedJsonEncodedSizeOf;
17
18use super::AfterReadExt;
19use crate::{
20 async_read::VecAsyncReadExt,
21 event::Event,
22 internal_events::{
23 ConnectionOpen, OpenGauge, SocketEventsReceived, SocketMode, StreamClosedError,
24 UnixSocketError, UnixSocketFileDeleteError,
25 },
26 shutdown::ShutdownSignal,
27 sources::util::change_socket_permissions,
28 sources::util::unix::UNNAMED_SOCKET_HOST,
29 sources::Source,
30 SourceSender,
31};
32
33pub fn build_unix_stream_source<D, F, E>(
38 listen_path: PathBuf,
39 socket_file_mode: Option<u32>,
40 decoder: D,
41 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
42 shutdown: ShutdownSignal,
43 out: SourceSender,
44) -> crate::Result<Source>
45where
46 D: tokio_util::codec::Decoder<Item = (F, usize), Error = E> + Clone + Send + 'static,
47 E: StreamDecodingError + std::fmt::Display + Send,
48 F: Into<SmallVec<[Event; 1]>> + Send,
49{
50 Ok(Box::pin(async move {
51 let listener = UnixListener::bind(&listen_path).unwrap_or_else(|e| {
52 panic!(
53 "Failed to bind to listener socket at path: {}. Err: {}",
54 listen_path.to_string_lossy(),
55 e
56 )
57 });
58 info!(message = "Listening.", path = ?listen_path, r#type = "unix");
59
60 change_socket_permissions(&listen_path, socket_file_mode)
61 .expect("Failed to set socket permissions");
62
63 let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
64
65 let connection_open = OpenGauge::new();
66 let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
67 tokio::pin!(stream);
68 while let Some(socket) = stream.next().await {
69 let socket = match socket {
70 Err(error) => {
71 error!(message = "Failed to accept socket.", %error);
72 continue;
73 }
74 Ok(socket) => socket,
75 };
76
77 let listen_path = listen_path.clone();
78
79 let span = info_span!("connection");
80
81 let received_from: Bytes = socket
82 .peer_addr()
83 .ok()
84 .and_then(|addr| {
85 addr.as_pathname().map(|e| e.to_owned()).map({
86 |path| {
87 span.record("peer_path", field::debug(&path));
88 path.to_string_lossy().into_owned().into()
89 }
90 })
91 })
92 .unwrap_or_else(|| UNNAMED_SOCKET_HOST.into());
97
98 let handle_events = handle_events.clone();
99
100 let bytes_received = bytes_received.clone();
101 let stream = socket
102 .after_read(move |byte_size| {
103 bytes_received.emit(ByteSize(byte_size));
104 })
105 .allow_read_until(shutdown.clone().map(|_| ()));
106 let mut stream = FramedRead::new(stream, decoder.clone());
107
108 let connection_open = connection_open.clone();
109 let mut out = out.clone();
110 tokio::spawn(
111 async move {
112 let _open_token = connection_open.open(|count| emit!(ConnectionOpen { count }));
113
114 while let Some(result) = stream.next().await {
115 match result {
116 Ok((frame, _byte_size)) => {
117 let mut events = frame.into();
118
119 emit!(SocketEventsReceived {
120 mode: SocketMode::Unix,
121 byte_size: events.estimated_json_encoded_size_of(),
122 count: events.len(),
123 });
124
125 handle_events(&mut events, Some(received_from.clone()));
126
127 let count = events.len();
128 if (out.send_batch(events).await).is_err() {
129 emit!(StreamClosedError { count });
130 }
131 }
132 Err(error) => {
133 emit!(UnixSocketError {
134 error: &error,
135 path: &listen_path
136 });
137
138 if !error.can_continue() {
139 break;
140 }
141 }
142 }
143 }
144
145 info!("Finished sending.");
146
147 let socket: &mut UnixStream = stream.get_mut().get_mut().get_mut_ref();
148 if let Err(error) = socket.shutdown().await {
149 error!(message = "Failed shutting down socket.", %error);
150 }
151 }
152 .instrument(span.or_current()),
153 );
154 }
155
156 while connection_open.any_open() {
158 sleep(Duration::from_millis(10)).await;
159 }
160
161 if let Err(error) = remove_file(&listen_path) {
163 emit!(UnixSocketFileDeleteError {
164 path: &listen_path,
165 error
166 });
167 }
168
169 Ok(())
170 }))
171}