vector/sources/util/net/
mod.rs1#[cfg(feature = "sources-utils-net-tcp")]
2mod tcp;
3#[cfg(feature = "sources-utils-net-udp")]
4mod udp;
5
6use std::{fmt, net::SocketAddr};
7
8use snafu::Snafu;
9use vector_lib::configurable::configurable_component;
10
11#[cfg(feature = "sources-utils-net-tcp")]
12pub use self::tcp::{
13 MAX_IN_FLIGHT_EVENTS_TARGET, TcpNullAcker, TcpSource, TcpSourceAck, TcpSourceAcker,
14 request_limiter::RequestLimiter, try_bind_tcp_listener,
15};
16#[cfg(feature = "sources-utils-net-udp")]
17pub use self::udp::try_bind_udp_socket;
18use crate::config::{Protocol, Resource};
19
20#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
21pub enum SocketListenAddrParseError {
22 #[snafu(display("Unable to parse as socket address"))]
23 SocketAddrParse,
24 #[snafu(display("# after \"systemd\" must be a valid integer"))]
25 UsizeParse,
26 #[snafu(display("Systemd indices start from 1, found 0"))]
27 OneBased,
28 #[snafu(display("Must be a valid IPv4/IPv6 address with port, or start with \"systemd\""))]
30 UnableToParse,
31}
32
33#[configurable_component]
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45#[serde(untagged)]
46#[serde(try_from = "String", into = "String")]
47#[configurable(metadata(docs::examples = "0.0.0.0:9000"))]
48#[configurable(metadata(docs::examples = "systemd"))]
49#[configurable(metadata(docs::examples = "systemd#3"))]
50pub enum SocketListenAddr {
51 SocketAddr(SocketAddr),
53
54 SystemdFd(usize),
56}
57
58impl SocketListenAddr {
59 const fn as_resource(self, protocol: Protocol) -> Resource {
60 match self {
61 Self::SocketAddr(addr) => match protocol {
62 Protocol::Tcp => Resource::tcp(addr),
63 Protocol::Udp => Resource::udp(addr),
64 },
65 Self::SystemdFd(fd_offset) => Resource::SystemFdOffset(fd_offset),
66 }
67 }
68
69 #[cfg(feature = "sources-utils-net-tcp")]
71 pub const fn as_tcp_resource(self) -> Resource {
72 self.as_resource(Protocol::Tcp)
73 }
74
75 #[cfg(feature = "sources-utils-net-udp")]
77 pub const fn as_udp_resource(self) -> Resource {
78 self.as_resource(Protocol::Udp)
79 }
80}
81
82impl fmt::Display for SocketListenAddr {
83 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84 match self {
85 Self::SocketAddr(addr) => addr.fmt(f),
86 Self::SystemdFd(offset) => write!(f, "systemd socket #{offset}"),
87 }
88 }
89}
90
91impl From<SocketAddr> for SocketListenAddr {
92 fn from(addr: SocketAddr) -> Self {
93 Self::SocketAddr(addr)
94 }
95}
96
97impl From<usize> for SocketListenAddr {
98 fn from(fd: usize) -> Self {
99 Self::SystemdFd(fd)
100 }
101}
102
103impl TryFrom<String> for SocketListenAddr {
104 type Error = SocketListenAddrParseError;
105
106 fn try_from(input: String) -> Result<Self, Self::Error> {
107 match input.parse::<SocketAddr>() {
109 Ok(socket_addr) => Ok(socket_addr.into()),
110
111 Err(_) => {
113 let fd: usize = match input.as_str() {
114 "systemd" => Ok(0),
115 s if s.starts_with("systemd#") => s[8..]
116 .parse::<usize>()
117 .map_err(|_| Self::Error::UsizeParse)?
118 .checked_sub(1)
119 .ok_or(Self::Error::OneBased),
120
121 _ => Err(Self::Error::UnableToParse),
123 }?;
124
125 Ok(fd.into())
126 }
127 }
128 }
129}
130
131impl From<SocketListenAddr> for String {
132 fn from(addr: SocketListenAddr) -> String {
133 match addr {
134 SocketListenAddr::SocketAddr(addr) => addr.to_string(),
135 SocketListenAddr::SystemdFd(fd) => {
136 if fd == 0 {
137 "systemd".to_owned()
138 } else {
139 format!("systemd#{fd}")
140 }
141 }
142 }
143 }
144}
145
146#[cfg(test)]
147mod test {
148 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
149
150 use serde::Deserialize;
151
152 use super::SocketListenAddr;
153
154 #[derive(Debug, Deserialize)]
155 struct Config {
156 addr: SocketListenAddr,
157 }
158
159 #[test]
160 fn parse_socket_listen_addr_success() {
161 let test: Config = toml::from_str(r#"addr="127.1.2.3:1234""#).unwrap();
162 assert_eq!(
163 test.addr,
164 SocketListenAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(
165 Ipv4Addr::new(127, 1, 2, 3),
166 1234,
167 )))
168 );
169 let test: Config = toml::from_str(r#"addr="systemd""#).unwrap();
170 assert_eq!(test.addr, SocketListenAddr::SystemdFd(0));
171 let test: Config = toml::from_str(r#"addr="systemd#3""#).unwrap();
172 assert_eq!(test.addr, SocketListenAddr::SystemdFd(2));
173 }
174
175 #[test]
176 fn parse_socket_listen_addr_fail() {
177 let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="127.1.2.3""#);
179 assert!(test.is_err());
180
181 let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="systemd#0""#);
184 assert!(test.is_err());
185
186 let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="system""#);
188 assert!(test.is_err());
189 }
190}