1use std::{
2 io,
3 os::fd::{AsFd, BorrowedFd},
4 path::PathBuf,
5 pin::Pin,
6};
7
8use async_trait::async_trait;
9use bytes::{Bytes, BytesMut};
10use futures::{SinkExt, StreamExt, stream::BoxStream};
11use snafu::{ResultExt, Snafu};
12use tokio::{
13 io::AsyncWriteExt,
14 net::{UnixDatagram, UnixStream},
15 time::sleep,
16};
17use tokio_util::codec::Encoder;
18use vector_lib::{
19 ByteSizeOf, EstimatedJsonEncodedSizeOf,
20 configurable::configurable_component,
21 internal_event::{BytesSent, Protocol},
22 json_size::JsonSize,
23};
24
25use super::datagram::{DatagramSocket, send_datagrams};
26use crate::{
27 codecs::Transformer,
28 common::backoff::ExponentialBackoff,
29 event::{Event, Finalizable},
30 internal_events::{
31 ConnectionOpen, OpenGauge, SocketMode, UnixSocketConnectionEstablished,
32 UnixSocketOutgoingConnectionError, UnixSocketSendError,
33 },
34 sink_ext::VecSinkExt,
35 sinks::{
36 Healthcheck, VectorSink,
37 util::{
38 EncodedEvent, StreamSink,
39 service::net::UnixMode,
40 socket_bytes_sink::{BytesSink, ShutdownCheck},
41 },
42 },
43};
44
45#[derive(Debug, Snafu)]
46pub enum UnixError {
47 #[snafu(display("Failed connecting to socket at path {}: {}", path.display(), source))]
48 ConnectionError {
49 source: tokio::io::Error,
50 path: PathBuf,
51 },
52
53 #[snafu(display("Failed to bind socket: {}.", source))]
54 FailedToBind { source: std::io::Error },
55}
56
57#[configurable_component]
59#[derive(Clone, Debug)]
60pub struct UnixSinkConfig {
61 #[configurable(metadata(docs::examples = "/path/to/socket"))]
65 pub path: PathBuf,
66}
67
68impl UnixSinkConfig {
69 pub const fn new(path: PathBuf) -> Self {
70 Self { path }
71 }
72
73 pub fn build(
74 &self,
75 transformer: Transformer,
76 encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
77 + Clone
78 + Send
79 + Sync
80 + 'static,
81 unix_mode: UnixMode,
82 ) -> crate::Result<(VectorSink, Healthcheck)> {
83 let connector = UnixConnector::new(self.path.clone(), unix_mode);
84 let sink = UnixSink::new(connector.clone(), transformer, encoder);
85 Ok((
86 VectorSink::from_event_streamsink(sink),
87 Box::pin(async move { connector.healthcheck().await }),
88 ))
89 }
90}
91
92pub enum UnixEither {
93 Datagram(UnixDatagram),
94 Stream(UnixStream),
95}
96
97impl UnixEither {
98 pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
99 match self {
100 Self::Datagram(datagram) => datagram.send(buf).await,
101 Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
102 }
103 }
104}
105
106impl AsFd for UnixEither {
107 fn as_fd(&self) -> BorrowedFd<'_> {
108 match self {
109 Self::Datagram(datagram) => datagram.as_fd(),
110 Self::Stream(stream) => stream.as_fd(),
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
116struct UnixConnector {
117 pub path: PathBuf,
118 mode: UnixMode,
119}
120
121impl UnixConnector {
122 const fn new(path: PathBuf, mode: UnixMode) -> Self {
123 Self { path, mode }
124 }
125
126 fn fresh_backoff() -> ExponentialBackoff {
127 ExponentialBackoff::default()
129 }
130
131 async fn connect(&self) -> Result<UnixEither, UnixError> {
132 match self.mode {
133 UnixMode::Stream => UnixStream::connect(&self.path)
134 .await
135 .context(ConnectionSnafu {
136 path: self.path.clone(),
137 })
138 .map(UnixEither::Stream),
139 UnixMode::Datagram => {
140 UnixDatagram::unbound()
141 .context(FailedToBindSnafu)
142 .and_then(|datagram| {
143 datagram
144 .connect(&self.path)
145 .context(ConnectionSnafu {
146 path: self.path.clone(),
147 })
148 .map(|_| UnixEither::Datagram(datagram))
149 })
150 }
151 }
152 }
153
154 async fn connect_backoff(&self) -> UnixEither {
155 let mut backoff = Self::fresh_backoff();
156 loop {
157 match self.connect().await {
158 Ok(stream) => {
159 emit!(UnixSocketConnectionEstablished { path: &self.path });
160 return stream;
161 }
162 Err(error) => {
163 emit!(UnixSocketOutgoingConnectionError { error });
164 sleep(backoff.next().unwrap()).await;
165 }
166 }
167 }
168 }
169
170 async fn healthcheck(&self) -> crate::Result<()> {
171 self.connect().await.map(|_| ()).map_err(Into::into)
172 }
173}
174
175struct UnixSink<E>
176where
177 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
178{
179 connector: UnixConnector,
180 transformer: Transformer,
181 encoder: E,
182}
183
184impl<E> UnixSink<E>
185where
186 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
187{
188 pub const fn new(connector: UnixConnector, transformer: Transformer, encoder: E) -> Self {
189 Self {
190 connector,
191 transformer,
192 encoder,
193 }
194 }
195
196 async fn connect(&mut self) -> BytesSink<UnixStream> {
197 let stream = match self.connector.connect_backoff().await {
198 UnixEither::Stream(stream) => stream,
199 UnixEither::Datagram(_) => unreachable!("connect is only called with Stream mode"),
200 };
201 BytesSink::new(stream, |_| ShutdownCheck::Alive, SocketMode::Unix)
202 }
203
204 async fn run_internal(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
205 match self.connector.mode {
206 UnixMode::Stream => self.run_stream(input).await,
207 UnixMode::Datagram => self.run_datagram(input).await,
208 }
209 }
210
211 async fn run_stream(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
213 let mut encoder = self.encoder.clone();
214 let transformer = self.transformer.clone();
215 let mut input = input
216 .map(|mut event| {
217 let byte_size = event.size_of();
218 let json_byte_size = event.estimated_json_encoded_size_of();
219
220 transformer.transform(&mut event);
221
222 let finalizers = event.take_finalizers();
223 let mut bytes = BytesMut::new();
224
225 if encoder.encode(event, &mut bytes).is_ok() {
227 let item = bytes.freeze();
228 EncodedEvent {
229 item,
230 finalizers,
231 byte_size,
232 json_byte_size,
233 }
234 } else {
235 EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
236 }
237 })
238 .peekable();
239
240 while Pin::new(&mut input).peek().await.is_some() {
241 let mut sink = self.connect().await;
242 let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
243
244 let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
245 Ok(()) => sink.close().await,
246 Err(error) => Err(error),
247 };
248
249 if let Err(error) = result {
250 emit!(UnixSocketSendError {
251 error: &error,
252 path: &self.connector.path
253 });
254 }
255 }
256
257 Ok(())
258 }
259
260 async fn run_datagram(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
261 let bytes_sent = register!(BytesSent::from(Protocol::UNIX));
262 let mut input = input.peekable();
263
264 let mut encoder = self.encoder.clone();
265 while Pin::new(&mut input).peek().await.is_some() {
266 let socket = match self.connector.connect_backoff().await {
267 UnixEither::Datagram(datagram) => datagram,
268 UnixEither::Stream(_) => {
269 unreachable!("run_datagram is only called with Datagram mode")
270 }
271 };
272
273 send_datagrams(
274 &mut input,
275 DatagramSocket::Unix(socket, self.connector.path.clone()),
276 &self.transformer,
277 &mut encoder,
278 &None,
279 &bytes_sent,
280 )
281 .await;
282 }
283
284 Ok(())
285 }
286}
287
288#[async_trait]
289impl<E> StreamSink<Event> for UnixSink<E>
290where
291 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
292{
293 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
294 self.run_internal(input).await
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use tokio::net::UnixListener;
301 use vector_lib::codecs::{
302 BytesEncoder, NewlineDelimitedEncoder, TextSerializerConfig, encoding::Framer,
303 };
304
305 use super::*;
306 use crate::{
307 codecs::Encoder,
308 test_util::{
309 CountReceiver,
310 components::{SINK_TAGS, assert_sink_compliance},
311 random_lines_with_stream,
312 },
313 };
314
315 fn temp_uds_path(name: &str) -> PathBuf {
316 tempfile::tempdir().unwrap().keep().join(name)
317 }
318
319 #[tokio::test]
320 async fn unix_sink_healthcheck() {
321 let good_path = temp_uds_path("valid_stream_uds");
322 let _listener = UnixListener::bind(&good_path).unwrap();
323 assert!(
324 UnixSinkConfig::new(good_path.clone())
325 .build(
326 Default::default(),
327 Encoder::<()>::new(TextSerializerConfig::default().build().into()),
328 UnixMode::Stream
329 )
330 .unwrap()
331 .1
332 .await
333 .is_ok()
334 );
335 assert!(
336 UnixSinkConfig::new(good_path.clone())
337 .build(
338 Default::default(),
339 Encoder::<()>::new(TextSerializerConfig::default().build().into()),
340 UnixMode::Datagram
341 )
342 .unwrap()
343 .1
344 .await
345 .is_err(),
346 "datagram mode should fail when attempting to send into a stream mode UDS"
347 );
348
349 let bad_path = temp_uds_path("no_one_listening");
350 assert!(
351 UnixSinkConfig::new(bad_path.clone())
352 .build(
353 Default::default(),
354 Encoder::<()>::new(TextSerializerConfig::default().build().into()),
355 UnixMode::Stream
356 )
357 .unwrap()
358 .1
359 .await
360 .is_err()
361 );
362 assert!(
363 UnixSinkConfig::new(bad_path.clone())
364 .build(
365 Default::default(),
366 Encoder::<()>::new(TextSerializerConfig::default().build().into()),
367 UnixMode::Datagram
368 )
369 .unwrap()
370 .1
371 .await
372 .is_err()
373 );
374 }
375
376 #[tokio::test]
377 async fn basic_unix_sink() {
378 let num_lines = 1000;
379 let out_path = temp_uds_path("unix_test");
380
381 let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
383
384 let config = UnixSinkConfig::new(out_path);
386 let (sink, _healthcheck) = config
387 .build(
388 Default::default(),
389 Encoder::<Framer>::new(
390 NewlineDelimitedEncoder::default().into(),
391 TextSerializerConfig::default().build().into(),
392 ),
393 UnixMode::Stream,
394 )
395 .unwrap();
396
397 let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
399
400 assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
401 .await
402 .expect("Running sink failed");
403
404 receiver.connected().await;
406
407 assert_eq!(input_lines, receiver.await);
409 }
410
411 #[cfg_attr(target_os = "macos", ignore)]
412 #[tokio::test]
413 async fn basic_unix_datagram_sink() {
414 let num_lines = 1000;
415 let out_path = temp_uds_path("unix_datagram_test");
416
417 let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap();
419 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
420
421 let handle = tokio::task::spawn_blocking(move || {
423 let mut output_lines = Vec::<String>::with_capacity(num_lines);
424
425 ready_tx.send(()).expect("failed to signal readiness");
426 for _ in 0..num_lines {
427 let mut buf = [0; 101];
428 let (size, _) = receiver
429 .recv_from(&mut buf)
430 .expect("Did not receive message");
431 let line = String::from_utf8_lossy(&buf[..size]).to_string();
432 output_lines.push(line);
433 }
434
435 output_lines
436 });
437 ready_rx.await.expect("failed to receive ready signal");
438
439 let config = UnixSinkConfig::new(out_path.clone());
441 let (sink, _healthcheck) = config
442 .build(
443 Default::default(),
444 Encoder::<Framer>::new(
445 BytesEncoder.into(),
446 TextSerializerConfig::default().build().into(),
447 ),
448 UnixMode::Datagram,
449 )
450 .unwrap();
451
452 let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
454
455 assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
456 .await
457 .expect("Running sink failed");
458
459 let output_lines = handle.await.expect("UDS Datagram receiver failed");
461
462 assert_eq!(input_lines, output_lines);
463 }
464}