vector/sources/util/net/
mod.rs

1#[cfg(feature = "sources-utils-net-tcp")]
2mod tcp;
3#[cfg(feature = "sources-utils-net-udp")]
4mod udp;
5
6use std::{fmt, net::SocketAddr};
7
8use snafu::Snafu;
9use vector_lib::configurable::configurable_component;
10
11#[cfg(feature = "sources-utils-net-tcp")]
12pub use self::tcp::{
13    MAX_IN_FLIGHT_EVENTS_TARGET, TcpNullAcker, TcpSource, TcpSourceAck, TcpSourceAcker,
14    request_limiter::RequestLimiter, try_bind_tcp_listener,
15};
16#[cfg(feature = "sources-utils-net-udp")]
17pub use self::udp::try_bind_udp_socket;
18use crate::config::{Protocol, Resource};
19
20#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
21pub enum SocketListenAddrParseError {
22    #[snafu(display("Unable to parse as socket address"))]
23    SocketAddrParse,
24    #[snafu(display("# after \"systemd\" must be a valid integer"))]
25    UsizeParse,
26    #[snafu(display("Systemd indices start from 1, found 0"))]
27    OneBased,
28    // last case evaluated must explain all valid formats accepted
29    #[snafu(display("Must be a valid IPv4/IPv6 address with port, or start with \"systemd\""))]
30    UnableToParse,
31}
32
33/// The socket address to listen for connections on, or `systemd{#N}` to use the Nth socket passed by
34/// systemd socket activation.
35///
36/// If a socket address is used, it _must_ include a port.
37//
38// `SocketListenAddr` is valid for any socket based source, such as `fluent` and `logstash`.
39//  Socket activation is just a way for the program to get a socket for listening on.
40//  Systemd can open the port, if it is a privileged number. That way the program does not
41//  need to worry about dropping ports.
42//  This is particularly common in non-containerized environments.
43#[configurable_component]
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45#[serde(untagged)]
46#[serde(try_from = "String", into = "String")]
47#[configurable(metadata(docs::examples = "0.0.0.0:9000"))]
48#[configurable(metadata(docs::examples = "systemd"))]
49#[configurable(metadata(docs::examples = "systemd#3"))]
50pub enum SocketListenAddr {
51    /// An IPv4/IPv6 address and port.
52    SocketAddr(SocketAddr),
53
54    /// A file descriptor identifier that is given from, and managed by, the socket activation feature of `systemd`.
55    SystemdFd(usize),
56}
57
58impl SocketListenAddr {
59    const fn as_resource(self, protocol: Protocol) -> Resource {
60        match self {
61            Self::SocketAddr(addr) => match protocol {
62                Protocol::Tcp => Resource::tcp(addr),
63                Protocol::Udp => Resource::udp(addr),
64            },
65            Self::SystemdFd(fd_offset) => Resource::SystemFdOffset(fd_offset),
66        }
67    }
68
69    /// Gets this listen address as a `Resource`, specifically for TCP.
70    #[cfg(feature = "sources-utils-net-tcp")]
71    pub const fn as_tcp_resource(self) -> Resource {
72        self.as_resource(Protocol::Tcp)
73    }
74
75    /// Gets this listen address as a `Resource`, specifically for UDP.
76    #[cfg(feature = "sources-utils-net-udp")]
77    pub const fn as_udp_resource(self) -> Resource {
78        self.as_resource(Protocol::Udp)
79    }
80}
81
82impl fmt::Display for SocketListenAddr {
83    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84        match self {
85            Self::SocketAddr(addr) => addr.fmt(f),
86            Self::SystemdFd(offset) => write!(f, "systemd socket #{offset}"),
87        }
88    }
89}
90
91impl From<SocketAddr> for SocketListenAddr {
92    fn from(addr: SocketAddr) -> Self {
93        Self::SocketAddr(addr)
94    }
95}
96
97impl From<usize> for SocketListenAddr {
98    fn from(fd: usize) -> Self {
99        Self::SystemdFd(fd)
100    }
101}
102
103impl TryFrom<String> for SocketListenAddr {
104    type Error = SocketListenAddrParseError;
105
106    fn try_from(input: String) -> Result<Self, Self::Error> {
107        // first attempt to parse the string into a SocketAddr directly
108        match input.parse::<SocketAddr>() {
109            Ok(socket_addr) => Ok(socket_addr.into()),
110
111            // then attempt to parse a systemd file descriptor
112            Err(_) => {
113                let fd: usize = match input.as_str() {
114                    "systemd" => Ok(0),
115                    s if s.starts_with("systemd#") => s[8..]
116                        .parse::<usize>()
117                        .map_err(|_| Self::Error::UsizeParse)?
118                        .checked_sub(1)
119                        .ok_or(Self::Error::OneBased),
120
121                    // otherwise fail
122                    _ => Err(Self::Error::UnableToParse),
123                }?;
124
125                Ok(fd.into())
126            }
127        }
128    }
129}
130
131impl From<SocketListenAddr> for String {
132    fn from(addr: SocketListenAddr) -> String {
133        match addr {
134            SocketListenAddr::SocketAddr(addr) => addr.to_string(),
135            SocketListenAddr::SystemdFd(fd) => {
136                if fd == 0 {
137                    "systemd".to_owned()
138                } else {
139                    format!("systemd#{fd}")
140                }
141            }
142        }
143    }
144}
145
146#[cfg(test)]
147mod test {
148    use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
149
150    use serde::Deserialize;
151
152    use super::SocketListenAddr;
153
154    #[derive(Debug, Deserialize)]
155    struct Config {
156        addr: SocketListenAddr,
157    }
158
159    #[test]
160    fn parse_socket_listen_addr_success() {
161        let test: Config = toml::from_str(r#"addr="127.1.2.3:1234""#).unwrap();
162        assert_eq!(
163            test.addr,
164            SocketListenAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(
165                Ipv4Addr::new(127, 1, 2, 3),
166                1234,
167            )))
168        );
169        let test: Config = toml::from_str(r#"addr="systemd""#).unwrap();
170        assert_eq!(test.addr, SocketListenAddr::SystemdFd(0));
171        let test: Config = toml::from_str(r#"addr="systemd#3""#).unwrap();
172        assert_eq!(test.addr, SocketListenAddr::SystemdFd(2));
173    }
174
175    #[test]
176    fn parse_socket_listen_addr_fail() {
177        // no port specified
178        let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="127.1.2.3""#);
179        assert!(test.is_err());
180
181        // systemd fd indexing should be one based not zero.
182        // the user should leave off the {#N} to get the fd 0.
183        let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="systemd#0""#);
184        assert!(test.is_err());
185
186        // typo
187        let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="system""#);
188        assert!(test.is_err());
189    }
190}