vector/sinks/util/service/net/
unix.rs

1use std::path::{Path, PathBuf};
2
3use snafu::ResultExt;
4use tokio::net::{UnixDatagram, UnixStream};
5
6use vector_lib::configurable::configurable_component;
7
8use crate::{net, sinks::util::unix::UnixEither};
9
10use super::{net_error::*, ConnectorType, NetError, NetworkConnector};
11
12/// Unix socket modes.
13#[configurable_component]
14#[derive(Clone, Copy, Debug)]
15pub enum UnixMode {
16    /// Datagram-oriented (`SOCK_DGRAM`).
17    Datagram,
18
19    /// Stream-oriented (`SOCK_STREAM`).
20    Stream,
21}
22
23/// Unix Domain Socket configuration.
24#[configurable_component]
25#[derive(Clone, Debug)]
26pub struct UnixConnectorConfig {
27    /// The Unix socket path.
28    ///
29    /// This should be an absolute path.
30    #[configurable(metadata(docs::examples = "/path/to/socket"))]
31    path: PathBuf,
32
33    /// The Unix socket mode to use.
34    #[serde(default = "default_unix_mode")]
35    unix_mode: UnixMode,
36
37    /// The size of the socket's send buffer.
38    ///
39    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
40    #[configurable(metadata(docs::type_unit = "bytes"))]
41    #[configurable(metadata(docs::examples = 65536))]
42    send_buffer_size: Option<usize>,
43}
44
45const fn default_unix_mode() -> UnixMode {
46    UnixMode::Stream
47}
48
49impl UnixConnectorConfig {
50    pub fn from_path<P: AsRef<Path>>(path: P) -> Self {
51        Self {
52            path: path.as_ref().to_path_buf(),
53            unix_mode: UnixMode::Stream,
54            send_buffer_size: None,
55        }
56    }
57
58    /// Creates a [`NetworkConnector`] from this Unix Domain Socket connector configuration.
59    pub fn as_connector(&self) -> NetworkConnector {
60        NetworkConnector {
61            inner: ConnectorType::Unix(UnixConnector {
62                path: self.path.clone(),
63                mode: self.unix_mode,
64                send_buffer_size: self.send_buffer_size,
65            }),
66        }
67    }
68}
69
70#[derive(Clone)]
71pub(super) struct UnixConnector {
72    path: PathBuf,
73    mode: UnixMode,
74    send_buffer_size: Option<usize>,
75}
76
77impl UnixConnector {
78    pub(super) async fn connect(&self) -> Result<(PathBuf, UnixEither), NetError> {
79        let either_socket = match self.mode {
80            UnixMode::Datagram => {
81                UnixDatagram::unbound()
82                    .context(FailedToBind)
83                    .and_then(|datagram| {
84                        datagram
85                            .connect(&self.path)
86                            .context(FailedToConnect)
87                            .map(|_| UnixEither::Datagram(datagram))
88                    })?
89            }
90            UnixMode::Stream => UnixStream::connect(&self.path)
91                .await
92                .context(FailedToConnect)
93                .map(UnixEither::Stream)?,
94        };
95
96        if let Some(send_buffer_size) = self.send_buffer_size {
97            if let Err(error) = net::set_send_buffer_size(&either_socket, send_buffer_size) {
98                warn!(%error, "Failed configuring send buffer size on Unix socket.");
99            }
100        }
101
102        Ok((self.path.clone(), either_socket))
103    }
104}