1use std::{
2 io,
3 os::fd::{AsFd, BorrowedFd},
4 path::PathBuf,
5 pin::Pin,
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use bytes::{Bytes, BytesMut};
11use futures::{stream::BoxStream, SinkExt, StreamExt};
12use snafu::{ResultExt, Snafu};
13use tokio::{
14 io::AsyncWriteExt,
15 net::{UnixDatagram, UnixStream},
16 time::sleep,
17};
18use tokio_util::codec::Encoder;
19use vector_lib::json_size::JsonSize;
20use vector_lib::{
21 configurable::configurable_component,
22 internal_event::{BytesSent, Protocol},
23};
24use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
25
26use crate::{
27 codecs::Transformer,
28 common::backoff::ExponentialBackoff,
29 event::{Event, Finalizable},
30 internal_events::{
31 ConnectionOpen, OpenGauge, SocketMode, UnixSocketConnectionEstablished,
32 UnixSocketOutgoingConnectionError, UnixSocketSendError,
33 },
34 sink_ext::VecSinkExt,
35 sinks::{
36 util::{
37 service::net::UnixMode,
38 socket_bytes_sink::{BytesSink, ShutdownCheck},
39 EncodedEvent, StreamSink,
40 },
41 Healthcheck, VectorSink,
42 },
43};
44
45use super::datagram::{send_datagrams, DatagramSocket};
46
47#[derive(Debug, Snafu)]
48pub enum UnixError {
49 #[snafu(display("Failed connecting to socket at path {}: {}", path.display(), source))]
50 ConnectionError {
51 source: tokio::io::Error,
52 path: PathBuf,
53 },
54
55 #[snafu(display("Failed to bind socket: {}.", source))]
56 FailedToBind { source: std::io::Error },
57}
58
59#[configurable_component]
61#[derive(Clone, Debug)]
62pub struct UnixSinkConfig {
63 #[configurable(metadata(docs::examples = "/path/to/socket"))]
67 pub path: PathBuf,
68}
69
70impl UnixSinkConfig {
71 pub const fn new(path: PathBuf) -> Self {
72 Self { path }
73 }
74
75 pub fn build(
76 &self,
77 transformer: Transformer,
78 encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
79 + Clone
80 + Send
81 + Sync
82 + 'static,
83 unix_mode: UnixMode,
84 ) -> crate::Result<(VectorSink, Healthcheck)> {
85 let connector = UnixConnector::new(self.path.clone(), unix_mode);
86 let sink = UnixSink::new(connector.clone(), transformer, encoder);
87 Ok((
88 VectorSink::from_event_streamsink(sink),
89 Box::pin(async move { connector.healthcheck().await }),
90 ))
91 }
92}
93
94pub enum UnixEither {
95 Datagram(UnixDatagram),
96 Stream(UnixStream),
97}
98
99impl UnixEither {
100 pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
101 match self {
102 Self::Datagram(datagram) => datagram.send(buf).await,
103 Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
104 }
105 }
106}
107
108impl AsFd for UnixEither {
109 fn as_fd(&self) -> BorrowedFd<'_> {
110 match self {
111 Self::Datagram(datagram) => datagram.as_fd(),
112 Self::Stream(stream) => stream.as_fd(),
113 }
114 }
115}
116
117#[derive(Debug, Clone)]
118struct UnixConnector {
119 pub path: PathBuf,
120 mode: UnixMode,
121}
122
123impl UnixConnector {
124 const fn new(path: PathBuf, mode: UnixMode) -> Self {
125 Self { path, mode }
126 }
127
128 const fn fresh_backoff() -> ExponentialBackoff {
129 ExponentialBackoff::from_millis(2)
131 .factor(250)
132 .max_delay(Duration::from_secs(60))
133 }
134
135 async fn connect(&self) -> Result<UnixEither, UnixError> {
136 match self.mode {
137 UnixMode::Stream => UnixStream::connect(&self.path)
138 .await
139 .context(ConnectionSnafu {
140 path: self.path.clone(),
141 })
142 .map(UnixEither::Stream),
143 UnixMode::Datagram => {
144 UnixDatagram::unbound()
145 .context(FailedToBindSnafu)
146 .and_then(|datagram| {
147 datagram
148 .connect(&self.path)
149 .context(ConnectionSnafu {
150 path: self.path.clone(),
151 })
152 .map(|_| UnixEither::Datagram(datagram))
153 })
154 }
155 }
156 }
157
158 async fn connect_backoff(&self) -> UnixEither {
159 let mut backoff = Self::fresh_backoff();
160 loop {
161 match self.connect().await {
162 Ok(stream) => {
163 emit!(UnixSocketConnectionEstablished { path: &self.path });
164 return stream;
165 }
166 Err(error) => {
167 emit!(UnixSocketOutgoingConnectionError { error });
168 sleep(backoff.next().unwrap()).await;
169 }
170 }
171 }
172 }
173
174 async fn healthcheck(&self) -> crate::Result<()> {
175 self.connect().await.map(|_| ()).map_err(Into::into)
176 }
177}
178
179struct UnixSink<E>
180where
181 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
182{
183 connector: UnixConnector,
184 transformer: Transformer,
185 encoder: E,
186}
187
188impl<E> UnixSink<E>
189where
190 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
191{
192 pub const fn new(connector: UnixConnector, transformer: Transformer, encoder: E) -> Self {
193 Self {
194 connector,
195 transformer,
196 encoder,
197 }
198 }
199
200 async fn connect(&mut self) -> BytesSink<UnixStream> {
201 let stream = match self.connector.connect_backoff().await {
202 UnixEither::Stream(stream) => stream,
203 UnixEither::Datagram(_) => unreachable!("connect is only called with Stream mode"),
204 };
205 BytesSink::new(stream, |_| ShutdownCheck::Alive, SocketMode::Unix)
206 }
207
208 async fn run_internal(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
209 match self.connector.mode {
210 UnixMode::Stream => self.run_stream(input).await,
211 UnixMode::Datagram => self.run_datagram(input).await,
212 }
213 }
214
215 async fn run_stream(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
217 let mut encoder = self.encoder.clone();
218 let transformer = self.transformer.clone();
219 let mut input = input
220 .map(|mut event| {
221 let byte_size = event.size_of();
222 let json_byte_size = event.estimated_json_encoded_size_of();
223
224 transformer.transform(&mut event);
225
226 let finalizers = event.take_finalizers();
227 let mut bytes = BytesMut::new();
228
229 if encoder.encode(event, &mut bytes).is_ok() {
231 let item = bytes.freeze();
232 EncodedEvent {
233 item,
234 finalizers,
235 byte_size,
236 json_byte_size,
237 }
238 } else {
239 EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
240 }
241 })
242 .peekable();
243
244 while Pin::new(&mut input).peek().await.is_some() {
245 let mut sink = self.connect().await;
246 let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
247
248 let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
249 Ok(()) => sink.close().await,
250 Err(error) => Err(error),
251 };
252
253 if let Err(error) = result {
254 emit!(UnixSocketSendError {
255 error: &error,
256 path: &self.connector.path
257 });
258 }
259 }
260
261 Ok(())
262 }
263
264 async fn run_datagram(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
265 let bytes_sent = register!(BytesSent::from(Protocol::UNIX));
266 let mut input = input.peekable();
267
268 let mut encoder = self.encoder.clone();
269 while Pin::new(&mut input).peek().await.is_some() {
270 let socket = match self.connector.connect_backoff().await {
271 UnixEither::Datagram(datagram) => datagram,
272 UnixEither::Stream(_) => {
273 unreachable!("run_datagram is only called with Datagram mode")
274 }
275 };
276
277 send_datagrams(
278 &mut input,
279 DatagramSocket::Unix(socket, self.connector.path.clone()),
280 &self.transformer,
281 &mut encoder,
282 &bytes_sent,
283 )
284 .await;
285 }
286
287 Ok(())
288 }
289}
290
291#[async_trait]
292impl<E> StreamSink<Event> for UnixSink<E>
293where
294 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
295{
296 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
297 self.run_internal(input).await
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use tokio::net::UnixListener;
304 use vector_lib::codecs::{
305 encoding::Framer, BytesEncoder, NewlineDelimitedEncoder, TextSerializerConfig,
306 };
307
308 use super::*;
309 use crate::{
310 codecs::Encoder,
311 test_util::{
312 components::{assert_sink_compliance, SINK_TAGS},
313 random_lines_with_stream, CountReceiver,
314 },
315 };
316
317 fn temp_uds_path(name: &str) -> PathBuf {
318 tempfile::tempdir().unwrap().keep().join(name)
319 }
320
321 #[tokio::test]
322 async fn unix_sink_healthcheck() {
323 let good_path = temp_uds_path("valid_stream_uds");
324 let _listener = UnixListener::bind(&good_path).unwrap();
325 assert!(UnixSinkConfig::new(good_path.clone())
326 .build(
327 Default::default(),
328 Encoder::<()>::new(TextSerializerConfig::default().build().into()),
329 UnixMode::Stream
330 )
331 .unwrap()
332 .1
333 .await
334 .is_ok());
335 assert!(
336 UnixSinkConfig::new(good_path.clone())
337 .build(
338 Default::default(),
339 Encoder::<()>::new(TextSerializerConfig::default().build().into()),
340 UnixMode::Datagram
341 )
342 .unwrap()
343 .1
344 .await
345 .is_err(),
346 "datagram mode should fail when attempting to send into a stream mode UDS"
347 );
348
349 let bad_path = temp_uds_path("no_one_listening");
350 assert!(UnixSinkConfig::new(bad_path.clone())
351 .build(
352 Default::default(),
353 Encoder::<()>::new(TextSerializerConfig::default().build().into()),
354 UnixMode::Stream
355 )
356 .unwrap()
357 .1
358 .await
359 .is_err());
360 assert!(UnixSinkConfig::new(bad_path.clone())
361 .build(
362 Default::default(),
363 Encoder::<()>::new(TextSerializerConfig::default().build().into()),
364 UnixMode::Datagram
365 )
366 .unwrap()
367 .1
368 .await
369 .is_err());
370 }
371
372 #[tokio::test]
373 async fn basic_unix_sink() {
374 let num_lines = 1000;
375 let out_path = temp_uds_path("unix_test");
376
377 let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
379
380 let config = UnixSinkConfig::new(out_path);
382 let (sink, _healthcheck) = config
383 .build(
384 Default::default(),
385 Encoder::<Framer>::new(
386 NewlineDelimitedEncoder::default().into(),
387 TextSerializerConfig::default().build().into(),
388 ),
389 UnixMode::Stream,
390 )
391 .unwrap();
392
393 let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
395
396 assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
397 .await
398 .expect("Running sink failed");
399
400 receiver.connected().await;
402
403 assert_eq!(input_lines, receiver.await);
405 }
406
407 #[cfg_attr(target_os = "macos", ignore)]
408 #[tokio::test]
409 async fn basic_unix_datagram_sink() {
410 let num_lines = 1000;
411 let out_path = temp_uds_path("unix_datagram_test");
412
413 let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap();
415 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
416
417 let handle = tokio::task::spawn_blocking(move || {
419 let mut output_lines = Vec::<String>::with_capacity(num_lines);
420
421 ready_tx.send(()).expect("failed to signal readiness");
422 for _ in 0..num_lines {
423 let mut buf = [0; 101];
424 let (size, _) = receiver
425 .recv_from(&mut buf)
426 .expect("Did not receive message");
427 let line = String::from_utf8_lossy(&buf[..size]).to_string();
428 output_lines.push(line);
429 }
430
431 output_lines
432 });
433 ready_rx.await.expect("failed to receive ready signal");
434
435 let config = UnixSinkConfig::new(out_path.clone());
437 let (sink, _healthcheck) = config
438 .build(
439 Default::default(),
440 Encoder::<Framer>::new(
441 BytesEncoder.into(),
442 TextSerializerConfig::default().build().into(),
443 ),
444 UnixMode::Datagram,
445 )
446 .unwrap();
447
448 let (input_lines, events) = random_lines_with_stream(100, num_lines, None);
450
451 assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await })
452 .await
453 .expect("Running sink failed");
454
455 let output_lines = handle.await.expect("UDS Datagram receiver failed");
457
458 assert_eq!(input_lines, output_lines);
459 }
460}