vector/sinks/util/service/net/
unix.rs

1use std::path::{Path, PathBuf};
2
3use snafu::ResultExt;
4use tokio::net::{UnixDatagram, UnixStream};
5use vector_lib::configurable::configurable_component;
6
7use super::{ConnectorType, NetError, NetworkConnector, net_error::*};
8use crate::{net, sinks::util::unix::UnixEither};
9
10/// Unix socket modes.
11#[configurable_component]
12#[derive(Clone, Copy, Debug)]
13pub enum UnixMode {
14    /// Datagram-oriented (`SOCK_DGRAM`).
15    Datagram,
16
17    /// Stream-oriented (`SOCK_STREAM`).
18    Stream,
19}
20
21/// Unix Domain Socket configuration.
22#[configurable_component]
23#[derive(Clone, Debug)]
24pub struct UnixConnectorConfig {
25    /// The Unix socket path.
26    ///
27    /// This should be an absolute path.
28    #[configurable(metadata(docs::examples = "/path/to/socket"))]
29    path: PathBuf,
30
31    /// The Unix socket mode to use.
32    #[serde(default = "default_unix_mode")]
33    unix_mode: UnixMode,
34
35    /// The size of the socket's send buffer.
36    ///
37    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
38    #[configurable(metadata(docs::type_unit = "bytes"))]
39    #[configurable(metadata(docs::examples = 65536))]
40    send_buffer_size: Option<usize>,
41}
42
43const fn default_unix_mode() -> UnixMode {
44    UnixMode::Stream
45}
46
47impl UnixConnectorConfig {
48    pub fn from_path<P: AsRef<Path>>(path: P) -> Self {
49        Self {
50            path: path.as_ref().to_path_buf(),
51            unix_mode: UnixMode::Stream,
52            send_buffer_size: None,
53        }
54    }
55
56    /// Creates a [`NetworkConnector`] from this Unix Domain Socket connector configuration.
57    pub fn as_connector(&self) -> NetworkConnector {
58        NetworkConnector {
59            inner: ConnectorType::Unix(UnixConnector {
60                path: self.path.clone(),
61                mode: self.unix_mode,
62                send_buffer_size: self.send_buffer_size,
63            }),
64        }
65    }
66}
67
68#[derive(Clone)]
69pub(super) struct UnixConnector {
70    path: PathBuf,
71    mode: UnixMode,
72    send_buffer_size: Option<usize>,
73}
74
75impl UnixConnector {
76    pub(super) async fn connect(&self) -> Result<(PathBuf, UnixEither), NetError> {
77        let either_socket = match self.mode {
78            UnixMode::Datagram => {
79                UnixDatagram::unbound()
80                    .context(FailedToBind)
81                    .and_then(|datagram| {
82                        datagram
83                            .connect(&self.path)
84                            .context(FailedToConnect)
85                            .map(|_| UnixEither::Datagram(datagram))
86                    })?
87            }
88            UnixMode::Stream => UnixStream::connect(&self.path)
89                .await
90                .context(FailedToConnect)
91                .map(UnixEither::Stream)?,
92        };
93
94        if let Some(send_buffer_size) = self.send_buffer_size
95            && let Err(error) = net::set_send_buffer_size(&either_socket, send_buffer_size)
96        {
97            warn!(%error, "Failed configuring send buffer size on Unix socket.");
98        }
99
100        Ok((self.path.clone(), either_socket))
101    }
102}