vector/sinks/util/service/net/
unix.rs1use std::path::{Path, PathBuf};
2
3use snafu::ResultExt;
4use tokio::net::{UnixDatagram, UnixStream};
5
6use vector_lib::configurable::configurable_component;
7
8use crate::{net, sinks::util::unix::UnixEither};
9
10use super::{net_error::*, ConnectorType, NetError, NetworkConnector};
11
12#[configurable_component]
14#[derive(Clone, Copy, Debug)]
15pub enum UnixMode {
16 Datagram,
18
19 Stream,
21}
22
23#[configurable_component]
25#[derive(Clone, Debug)]
26pub struct UnixConnectorConfig {
27 #[configurable(metadata(docs::examples = "/path/to/socket"))]
31 path: PathBuf,
32
33 #[serde(default = "default_unix_mode")]
35 unix_mode: UnixMode,
36
37 #[configurable(metadata(docs::type_unit = "bytes"))]
41 #[configurable(metadata(docs::examples = 65536))]
42 send_buffer_size: Option<usize>,
43}
44
45const fn default_unix_mode() -> UnixMode {
46 UnixMode::Stream
47}
48
49impl UnixConnectorConfig {
50 pub fn from_path<P: AsRef<Path>>(path: P) -> Self {
51 Self {
52 path: path.as_ref().to_path_buf(),
53 unix_mode: UnixMode::Stream,
54 send_buffer_size: None,
55 }
56 }
57
58 pub fn as_connector(&self) -> NetworkConnector {
60 NetworkConnector {
61 inner: ConnectorType::Unix(UnixConnector {
62 path: self.path.clone(),
63 mode: self.unix_mode,
64 send_buffer_size: self.send_buffer_size,
65 }),
66 }
67 }
68}
69
70#[derive(Clone)]
71pub(super) struct UnixConnector {
72 path: PathBuf,
73 mode: UnixMode,
74 send_buffer_size: Option<usize>,
75}
76
77impl UnixConnector {
78 pub(super) async fn connect(&self) -> Result<(PathBuf, UnixEither), NetError> {
79 let either_socket = match self.mode {
80 UnixMode::Datagram => {
81 UnixDatagram::unbound()
82 .context(FailedToBind)
83 .and_then(|datagram| {
84 datagram
85 .connect(&self.path)
86 .context(FailedToConnect)
87 .map(|_| UnixEither::Datagram(datagram))
88 })?
89 }
90 UnixMode::Stream => UnixStream::connect(&self.path)
91 .await
92 .context(FailedToConnect)
93 .map(UnixEither::Stream)?,
94 };
95
96 if let Some(send_buffer_size) = self.send_buffer_size {
97 if let Err(error) = net::set_send_buffer_size(&either_socket, send_buffer_size) {
98 warn!(%error, "Failed configuring send buffer size on Unix socket.");
99 }
100 }
101
102 Ok((self.path.clone(), either_socket))
103 }
104}