vector/sources/util/net/
mod.rs1#[cfg(feature = "sources-utils-net-tcp")]
2mod tcp;
3#[cfg(feature = "sources-utils-net-udp")]
4mod udp;
5
6use std::{fmt, net::SocketAddr};
7
8use snafu::Snafu;
9use vector_lib::configurable::configurable_component;
10
11use crate::config::{Protocol, Resource};
12
13#[cfg(feature = "sources-utils-net-tcp")]
14pub use self::tcp::{
15 request_limiter::RequestLimiter, try_bind_tcp_listener, TcpNullAcker, TcpSource, TcpSourceAck,
16 TcpSourceAcker, MAX_IN_FLIGHT_EVENTS_TARGET,
17};
18#[cfg(feature = "sources-utils-net-udp")]
19pub use self::udp::try_bind_udp_socket;
20
21#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
22pub enum SocketListenAddrParseError {
23 #[snafu(display("Unable to parse as socket address"))]
24 SocketAddrParse,
25 #[snafu(display("# after \"systemd\" must be a valid integer"))]
26 UsizeParse,
27 #[snafu(display("Systemd indices start from 1, found 0"))]
28 OneBased,
29 #[snafu(display("Must be a valid IPv4/IPv6 address with port, or start with \"systemd\""))]
31 UnableToParse,
32}
33
34#[configurable_component]
45#[derive(Clone, Copy, Debug, PartialEq, Eq)]
46#[serde(untagged)]
47#[serde(try_from = "String", into = "String")]
48#[configurable(metadata(docs::examples = "0.0.0.0:9000"))]
49#[configurable(metadata(docs::examples = "systemd"))]
50#[configurable(metadata(docs::examples = "systemd#3"))]
51pub enum SocketListenAddr {
52 SocketAddr(SocketAddr),
54
55 SystemdFd(usize),
57}
58
59impl SocketListenAddr {
60 const fn as_resource(self, protocol: Protocol) -> Resource {
61 match self {
62 Self::SocketAddr(addr) => match protocol {
63 Protocol::Tcp => Resource::tcp(addr),
64 Protocol::Udp => Resource::udp(addr),
65 },
66 Self::SystemdFd(fd_offset) => Resource::SystemFdOffset(fd_offset),
67 }
68 }
69
70 #[cfg(feature = "sources-utils-net-tcp")]
72 pub const fn as_tcp_resource(self) -> Resource {
73 self.as_resource(Protocol::Tcp)
74 }
75
76 #[cfg(feature = "sources-utils-net-udp")]
78 pub const fn as_udp_resource(self) -> Resource {
79 self.as_resource(Protocol::Udp)
80 }
81}
82
83impl fmt::Display for SocketListenAddr {
84 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85 match self {
86 Self::SocketAddr(addr) => addr.fmt(f),
87 Self::SystemdFd(offset) => write!(f, "systemd socket #{offset}"),
88 }
89 }
90}
91
92impl From<SocketAddr> for SocketListenAddr {
93 fn from(addr: SocketAddr) -> Self {
94 Self::SocketAddr(addr)
95 }
96}
97
98impl From<usize> for SocketListenAddr {
99 fn from(fd: usize) -> Self {
100 Self::SystemdFd(fd)
101 }
102}
103
104impl TryFrom<String> for SocketListenAddr {
105 type Error = SocketListenAddrParseError;
106
107 fn try_from(input: String) -> Result<Self, Self::Error> {
108 match input.parse::<SocketAddr>() {
110 Ok(socket_addr) => Ok(socket_addr.into()),
111
112 Err(_) => {
114 let fd: usize = match input.as_str() {
115 "systemd" => Ok(0),
116 s if s.starts_with("systemd#") => s[8..]
117 .parse::<usize>()
118 .map_err(|_| Self::Error::UsizeParse)?
119 .checked_sub(1)
120 .ok_or(Self::Error::OneBased),
121
122 _ => Err(Self::Error::UnableToParse),
124 }?;
125
126 Ok(fd.into())
127 }
128 }
129 }
130}
131
132impl From<SocketListenAddr> for String {
133 fn from(addr: SocketListenAddr) -> String {
134 match addr {
135 SocketListenAddr::SocketAddr(addr) => addr.to_string(),
136 SocketListenAddr::SystemdFd(fd) => {
137 if fd == 0 {
138 "systemd".to_owned()
139 } else {
140 format!("systemd#{fd}")
141 }
142 }
143 }
144 }
145}
146
147#[cfg(test)]
148mod test {
149 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
150
151 use serde::Deserialize;
152
153 use super::SocketListenAddr;
154
155 #[derive(Debug, Deserialize)]
156 struct Config {
157 addr: SocketListenAddr,
158 }
159
160 #[test]
161 fn parse_socket_listen_addr_success() {
162 let test: Config = toml::from_str(r#"addr="127.1.2.3:1234""#).unwrap();
163 assert_eq!(
164 test.addr,
165 SocketListenAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(
166 Ipv4Addr::new(127, 1, 2, 3),
167 1234,
168 )))
169 );
170 let test: Config = toml::from_str(r#"addr="systemd""#).unwrap();
171 assert_eq!(test.addr, SocketListenAddr::SystemdFd(0));
172 let test: Config = toml::from_str(r#"addr="systemd#3""#).unwrap();
173 assert_eq!(test.addr, SocketListenAddr::SystemdFd(2));
174 }
175
176 #[test]
177 fn parse_socket_listen_addr_fail() {
178 let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="127.1.2.3""#);
180 assert!(test.is_err());
181
182 let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="systemd#0""#);
185 assert!(test.is_err());
186
187 let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="system""#);
189 assert!(test.is_err());
190 }
191}