vector/sinks/util/service/net/
unix.rs1use std::path::{Path, PathBuf};
2
3use snafu::ResultExt;
4use tokio::net::{UnixDatagram, UnixStream};
5use vector_lib::configurable::configurable_component;
6
7use super::{ConnectorType, NetError, NetworkConnector, net_error::*};
8use crate::{net, sinks::util::unix::UnixEither};
9
10#[configurable_component]
12#[derive(Clone, Copy, Debug)]
13pub enum UnixMode {
14 Datagram,
16
17 Stream,
19}
20
21#[configurable_component]
23#[derive(Clone, Debug)]
24pub struct UnixConnectorConfig {
25 #[configurable(metadata(docs::examples = "/path/to/socket"))]
29 path: PathBuf,
30
31 #[serde(default = "default_unix_mode")]
33 unix_mode: UnixMode,
34
35 #[configurable(metadata(docs::type_unit = "bytes"))]
39 #[configurable(metadata(docs::examples = 65536))]
40 send_buffer_size: Option<usize>,
41}
42
43const fn default_unix_mode() -> UnixMode {
44 UnixMode::Stream
45}
46
47impl UnixConnectorConfig {
48 pub fn from_path<P: AsRef<Path>>(path: P) -> Self {
49 Self {
50 path: path.as_ref().to_path_buf(),
51 unix_mode: UnixMode::Stream,
52 send_buffer_size: None,
53 }
54 }
55
56 pub fn as_connector(&self) -> NetworkConnector {
58 NetworkConnector {
59 inner: ConnectorType::Unix(UnixConnector {
60 path: self.path.clone(),
61 mode: self.unix_mode,
62 send_buffer_size: self.send_buffer_size,
63 }),
64 }
65 }
66}
67
68#[derive(Clone)]
69pub(super) struct UnixConnector {
70 path: PathBuf,
71 mode: UnixMode,
72 send_buffer_size: Option<usize>,
73}
74
75impl UnixConnector {
76 pub(super) async fn connect(&self) -> Result<(PathBuf, UnixEither), NetError> {
77 let either_socket = match self.mode {
78 UnixMode::Datagram => {
79 UnixDatagram::unbound()
80 .context(FailedToBind)
81 .and_then(|datagram| {
82 datagram
83 .connect(&self.path)
84 .context(FailedToConnect)
85 .map(|_| UnixEither::Datagram(datagram))
86 })?
87 }
88 UnixMode::Stream => UnixStream::connect(&self.path)
89 .await
90 .context(FailedToConnect)
91 .map(UnixEither::Stream)?,
92 };
93
94 if let Some(send_buffer_size) = self.send_buffer_size
95 && let Err(error) = net::set_send_buffer_size(&either_socket, send_buffer_size)
96 {
97 warn!(%error, "Failed configuring send buffer size on Unix socket.");
98 }
99
100 Ok((self.path.clone(), either_socket))
101 }
102}