vector/sources/util/
unix_stream.rs1use std::{fs::remove_file, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, StreamExt};
5use smallvec::SmallVec;
6use tokio::{
7 io::AsyncWriteExt,
8 net::{UnixListener, UnixStream},
9 time::sleep,
10};
11use tokio_stream::wrappers::UnixListenerStream;
12use tracing::{Instrument, field};
13use vector_lib::{
14 EstimatedJsonEncodedSizeOf,
15 codecs::{DecoderFramedRead, StreamDecodingError},
16 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
17};
18
19use super::AfterReadExt;
20use crate::{
21 SourceSender,
22 async_read::VecAsyncReadExt,
23 event::Event,
24 internal_events::{
25 ConnectionOpen, OpenGauge, SocketEventsReceived, SocketMode, StreamClosedError,
26 UnixSocketError, UnixSocketFileDeleteError,
27 },
28 shutdown::ShutdownSignal,
29 sources::{
30 Source,
31 util::{change_socket_permissions, unix::UNNAMED_SOCKET_HOST},
32 },
33};
34
35pub fn build_unix_stream_source<D, F, E>(
40 listen_path: PathBuf,
41 socket_file_mode: Option<u32>,
42 decoder: D,
43 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
44 shutdown: ShutdownSignal,
45 out: SourceSender,
46) -> crate::Result<Source>
47where
48 D: tokio_util::codec::Decoder<Item = (F, usize), Error = E> + Clone + Send + 'static,
49 E: StreamDecodingError + std::fmt::Display + Send + From<std::io::Error>,
50 F: Into<SmallVec<[Event; 1]>> + Send,
51{
52 Ok(Box::pin(async move {
53 let listener = UnixListener::bind(&listen_path).unwrap_or_else(|e| {
54 panic!(
55 "Failed to bind to listener socket at path: {}. Err: {}",
56 listen_path.to_string_lossy(),
57 e
58 )
59 });
60 info!(message = "Listening.", path = ?listen_path, r#type = "unix");
61
62 change_socket_permissions(&listen_path, socket_file_mode)
63 .expect("Failed to set socket permissions");
64
65 let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
66
67 let connection_open = OpenGauge::new();
68 let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
69 tokio::pin!(stream);
70 while let Some(socket) = stream.next().await {
71 let socket = match socket {
72 Err(error) => {
73 error!(message = "Failed to accept socket.", %error);
74 continue;
75 }
76 Ok(socket) => socket,
77 };
78
79 let listen_path = listen_path.clone();
80
81 let span = info_span!("connection");
82
83 let received_from: Bytes = socket
84 .peer_addr()
85 .ok()
86 .and_then(|addr| {
87 addr.as_pathname().map(|e| e.to_owned()).map({
88 |path| {
89 span.record("peer_path", field::debug(&path));
90 path.to_string_lossy().into_owned().into()
91 }
92 })
93 })
94 .unwrap_or_else(|| UNNAMED_SOCKET_HOST.into());
99
100 let handle_events = handle_events.clone();
101
102 let bytes_received = bytes_received.clone();
103 let stream = socket
104 .after_read(move |byte_size| {
105 bytes_received.emit(ByteSize(byte_size));
106 })
107 .allow_read_until(shutdown.clone().map(|_| ()));
108 let mut stream = DecoderFramedRead::new(stream, decoder.clone());
109
110 let connection_open = connection_open.clone();
111 let mut out = out.clone();
112 tokio::spawn(
113 async move {
114 let _open_token = connection_open.open(|count| emit!(ConnectionOpen { count }));
115
116 while let Some(result) = stream.next().await {
117 match result {
118 Ok((frame, _byte_size)) => {
119 let mut events = frame.into();
120
121 emit!(SocketEventsReceived {
122 mode: SocketMode::Unix,
123 byte_size: events.estimated_json_encoded_size_of(),
124 count: events.len(),
125 });
126
127 handle_events(&mut events, Some(received_from.clone()));
128
129 let count = events.len();
130 if (out.send_batch(events).await).is_err() {
131 emit!(StreamClosedError { count });
132 }
133 }
134 Err(error) => {
135 emit!(UnixSocketError {
136 error: &error,
137 path: &listen_path
138 });
139
140 if !error.can_continue() {
141 break;
142 }
143 }
144 }
145 }
146
147 info!("Finished sending.");
148
149 let socket: &mut UnixStream = stream.get_mut().get_mut().get_mut_ref();
150 if let Err(error) = socket.shutdown().await {
151 error!(message = "Failed shutting down socket.", %error);
152 }
153 }
154 .instrument(span.or_current()),
155 );
156 }
157
158 while connection_open.any_open() {
160 sleep(Duration::from_millis(10)).await;
161 }
162
163 if let Err(error) = remove_file(&listen_path) {
165 emit!(UnixSocketFileDeleteError {
166 path: &listen_path,
167 error
168 });
169 }
170
171 Ok(())
172 }))
173}