vector/sources/util/net/
mod.rs

1#[cfg(feature = "sources-utils-net-tcp")]
2mod tcp;
3#[cfg(feature = "sources-utils-net-udp")]
4mod udp;
5
6use std::{fmt, net::SocketAddr};
7
8use snafu::Snafu;
9use vector_lib::configurable::configurable_component;
10
11use crate::config::{Protocol, Resource};
12
13#[cfg(feature = "sources-utils-net-tcp")]
14pub use self::tcp::{
15    request_limiter::RequestLimiter, try_bind_tcp_listener, TcpNullAcker, TcpSource, TcpSourceAck,
16    TcpSourceAcker, MAX_IN_FLIGHT_EVENTS_TARGET,
17};
18#[cfg(feature = "sources-utils-net-udp")]
19pub use self::udp::try_bind_udp_socket;
20
21#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
22pub enum SocketListenAddrParseError {
23    #[snafu(display("Unable to parse as socket address"))]
24    SocketAddrParse,
25    #[snafu(display("# after \"systemd\" must be a valid integer"))]
26    UsizeParse,
27    #[snafu(display("Systemd indices start from 1, found 0"))]
28    OneBased,
29    // last case evaluated must explain all valid formats accepted
30    #[snafu(display("Must be a valid IPv4/IPv6 address with port, or start with \"systemd\""))]
31    UnableToParse,
32}
33
34/// The socket address to listen for connections on, or `systemd{#N}` to use the Nth socket passed by
35/// systemd socket activation.
36///
37/// If a socket address is used, it _must_ include a port.
38//
39// `SocketListenAddr` is valid for any socket based source, such as `fluent` and `logstash`.
40//  Socket activation is just a way for the program to get a socket for listening on.
41//  Systemd can open the port, if it is a privileged number. That way the program does not
42//  need to worry about dropping ports.
43//  This is particularly common in non-containerized environments.
44#[configurable_component]
45#[derive(Clone, Copy, Debug, PartialEq, Eq)]
46#[serde(untagged)]
47#[serde(try_from = "String", into = "String")]
48#[configurable(metadata(docs::examples = "0.0.0.0:9000"))]
49#[configurable(metadata(docs::examples = "systemd"))]
50#[configurable(metadata(docs::examples = "systemd#3"))]
51pub enum SocketListenAddr {
52    /// An IPv4/IPv6 address and port.
53    SocketAddr(SocketAddr),
54
55    /// A file descriptor identifier that is given from, and managed by, the socket activation feature of `systemd`.
56    SystemdFd(usize),
57}
58
59impl SocketListenAddr {
60    const fn as_resource(self, protocol: Protocol) -> Resource {
61        match self {
62            Self::SocketAddr(addr) => match protocol {
63                Protocol::Tcp => Resource::tcp(addr),
64                Protocol::Udp => Resource::udp(addr),
65            },
66            Self::SystemdFd(fd_offset) => Resource::SystemFdOffset(fd_offset),
67        }
68    }
69
70    /// Gets this listen address as a `Resource`, specifically for TCP.
71    #[cfg(feature = "sources-utils-net-tcp")]
72    pub const fn as_tcp_resource(self) -> Resource {
73        self.as_resource(Protocol::Tcp)
74    }
75
76    /// Gets this listen address as a `Resource`, specifically for UDP.
77    #[cfg(feature = "sources-utils-net-udp")]
78    pub const fn as_udp_resource(self) -> Resource {
79        self.as_resource(Protocol::Udp)
80    }
81}
82
83impl fmt::Display for SocketListenAddr {
84    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85        match self {
86            Self::SocketAddr(addr) => addr.fmt(f),
87            Self::SystemdFd(offset) => write!(f, "systemd socket #{offset}"),
88        }
89    }
90}
91
92impl From<SocketAddr> for SocketListenAddr {
93    fn from(addr: SocketAddr) -> Self {
94        Self::SocketAddr(addr)
95    }
96}
97
98impl From<usize> for SocketListenAddr {
99    fn from(fd: usize) -> Self {
100        Self::SystemdFd(fd)
101    }
102}
103
104impl TryFrom<String> for SocketListenAddr {
105    type Error = SocketListenAddrParseError;
106
107    fn try_from(input: String) -> Result<Self, Self::Error> {
108        // first attempt to parse the string into a SocketAddr directly
109        match input.parse::<SocketAddr>() {
110            Ok(socket_addr) => Ok(socket_addr.into()),
111
112            // then attempt to parse a systemd file descriptor
113            Err(_) => {
114                let fd: usize = match input.as_str() {
115                    "systemd" => Ok(0),
116                    s if s.starts_with("systemd#") => s[8..]
117                        .parse::<usize>()
118                        .map_err(|_| Self::Error::UsizeParse)?
119                        .checked_sub(1)
120                        .ok_or(Self::Error::OneBased),
121
122                    // otherwise fail
123                    _ => Err(Self::Error::UnableToParse),
124                }?;
125
126                Ok(fd.into())
127            }
128        }
129    }
130}
131
132impl From<SocketListenAddr> for String {
133    fn from(addr: SocketListenAddr) -> String {
134        match addr {
135            SocketListenAddr::SocketAddr(addr) => addr.to_string(),
136            SocketListenAddr::SystemdFd(fd) => {
137                if fd == 0 {
138                    "systemd".to_owned()
139                } else {
140                    format!("systemd#{fd}")
141                }
142            }
143        }
144    }
145}
146
147#[cfg(test)]
148mod test {
149    use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
150
151    use serde::Deserialize;
152
153    use super::SocketListenAddr;
154
155    #[derive(Debug, Deserialize)]
156    struct Config {
157        addr: SocketListenAddr,
158    }
159
160    #[test]
161    fn parse_socket_listen_addr_success() {
162        let test: Config = toml::from_str(r#"addr="127.1.2.3:1234""#).unwrap();
163        assert_eq!(
164            test.addr,
165            SocketListenAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(
166                Ipv4Addr::new(127, 1, 2, 3),
167                1234,
168            )))
169        );
170        let test: Config = toml::from_str(r#"addr="systemd""#).unwrap();
171        assert_eq!(test.addr, SocketListenAddr::SystemdFd(0));
172        let test: Config = toml::from_str(r#"addr="systemd#3""#).unwrap();
173        assert_eq!(test.addr, SocketListenAddr::SystemdFd(2));
174    }
175
176    #[test]
177    fn parse_socket_listen_addr_fail() {
178        // no port specified
179        let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="127.1.2.3""#);
180        assert!(test.is_err());
181
182        // systemd fd indexing should be one based not zero.
183        // the user should leave off the {#N} to get the fd 0.
184        let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="systemd#0""#);
185        assert!(test.is_err());
186
187        // typo
188        let test: Result<Config, toml::de::Error> = toml::from_str(r#"addr="system""#);
189        assert!(test.is_err());
190    }
191}