vector/sinks/util/service/net/
mod.rs

1mod tcp;
2mod udp;
3
4#[cfg(unix)]
5mod unix;
6
7use std::{
8    io,
9    net::SocketAddr,
10    task::{Context, Poll, ready},
11    time::Duration,
12};
13
14use futures_util::{FutureExt, future::BoxFuture};
15use snafu::{ResultExt, Snafu};
16use tokio::{
17    io::AsyncWriteExt,
18    net::{TcpStream, UdpSocket},
19    sync::oneshot,
20    time::sleep,
21};
22use tower::Service;
23use vector_lib::{
24    configurable::configurable_component,
25    tls::{MaybeTlsStream, TlsError},
26};
27#[cfg(unix)]
28use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};
29
30#[cfg(unix)]
31use self::unix::UnixConnector;
32#[cfg(unix)]
33pub use self::unix::{UnixConnectorConfig, UnixMode};
34use self::{tcp::TcpConnector, udp::UdpConnector};
35pub use self::{tcp::TcpConnectorConfig, udp::UdpConnectorConfig};
36#[cfg(unix)]
37use crate::internal_events::{UnixSendIncompleteError, UnixSocketConnectionEstablished};
38use crate::{
39    common::backoff::ExponentialBackoff,
40    internal_events::{
41        SocketOutgoingConnectionError, TcpSocketConnectionEstablished, UdpSendIncompleteError,
42    },
43    sinks::Healthcheck,
44};
45
46/// Hostname and port tuple.
47///
48/// Both IP addresses and hostnames/fully qualified domain names (FQDNs) are accepted formats.
49///
50/// The address _must_ include a port.
51#[configurable_component]
52#[derive(Clone, Debug)]
53#[serde(try_from = "String", into = "String")]
54#[configurable(title = "The address to connect to.")]
55#[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
56#[configurable(metadata(docs::examples = "somehost:5000"))]
57struct HostAndPort {
58    /// Hostname.
59    host: String,
60
61    /// Port.
62    port: u16,
63}
64
65impl TryFrom<String> for HostAndPort {
66    type Error = String;
67
68    fn try_from(value: String) -> Result<Self, Self::Error> {
69        let uri = value.parse::<http::Uri>().map_err(|e| e.to_string())?;
70        let host = uri
71            .host()
72            .ok_or_else(|| "missing host".to_string())?
73            .to_string();
74        let port = uri.port_u16().ok_or_else(|| "missing port".to_string())?;
75
76        Ok(Self { host, port })
77    }
78}
79
80impl From<HostAndPort> for String {
81    fn from(value: HostAndPort) -> Self {
82        format!("{}:{}", value.host, value.port)
83    }
84}
85
86#[derive(Debug, Snafu)]
87#[snafu(module, context(suffix(false)), visibility(pub))]
88pub enum NetError {
89    #[snafu(display("Address is invalid: {}", reason))]
90    InvalidAddress { reason: String },
91
92    #[snafu(display("Failed to resolve address: {}", source))]
93    FailedToResolve { source: crate::dns::DnsError },
94
95    #[snafu(display("No addresses returned."))]
96    NoAddresses,
97
98    #[snafu(display("Failed to configure socket: {}.", source))]
99    FailedToConfigure { source: std::io::Error },
100
101    #[snafu(display("Failed to configure TLS: {}.", source))]
102    FailedToConfigureTLS { source: TlsError },
103
104    #[snafu(display("Failed to bind socket: {}.", source))]
105    FailedToBind { source: std::io::Error },
106
107    #[snafu(display("Failed to send message: {}", source))]
108    FailedToSend { source: std::io::Error },
109
110    #[snafu(display("Failed to connect to endpoint: {}", source))]
111    FailedToConnect { source: std::io::Error },
112
113    #[snafu(display("Failed to connect to TLS endpoint: {}", source))]
114    FailedToConnectTLS { source: TlsError },
115
116    #[snafu(display("Failed to get socket back after send as channel closed unexpectedly."))]
117    ServiceSocketChannelClosed,
118}
119
120enum NetworkServiceState {
121    /// The service is currently disconnected.
122    Disconnected,
123
124    /// The service is currently attempting to connect to the endpoint.
125    Connecting(BoxFuture<'static, NetworkConnection>),
126
127    /// The service is connected and idle.
128    Connected(NetworkConnection),
129
130    /// The service has an in-flight send to the socket.
131    ///
132    /// If the socket experiences an unrecoverable error during the send, `None` will be returned
133    /// over the channel to signal the need to establish a new connection rather than reusing the
134    /// existing connection.
135    Sending(oneshot::Receiver<Option<NetworkConnection>>),
136}
137
138enum NetworkConnection {
139    Tcp(MaybeTlsStream<TcpStream>),
140    Udp(UdpSocket),
141    #[cfg(unix)]
142    Unix(UnixEither),
143}
144
145impl NetworkConnection {
146    fn on_partial_send(&self, data_size: usize, sent: usize) {
147        match self {
148            // Can't "successfully" partially send with TCP: it either all eventually sends or the
149            // socket has an I/O error that kills the connection entirely.
150            Self::Tcp(_) => {}
151            Self::Udp(_) => {
152                emit!(UdpSendIncompleteError { data_size, sent });
153            }
154            #[cfg(unix)]
155            Self::Unix(_) => {
156                emit!(UnixSendIncompleteError { data_size, sent });
157            }
158        }
159    }
160
161    async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
162        match self {
163            Self::Tcp(stream) => stream.write_all(buf).await.map(|()| buf.len()),
164            Self::Udp(socket) => socket.send(buf).await,
165            #[cfg(unix)]
166            Self::Unix(socket) => socket.send(buf).await,
167        }
168    }
169}
170
171enum ConnectionMetadata {
172    Tcp {
173        peer_addr: SocketAddr,
174    },
175    #[cfg(unix)]
176    Unix {
177        path: PathBuf,
178    },
179}
180
181#[derive(Clone)]
182enum ConnectorType {
183    Tcp(TcpConnector),
184    Udp(UdpConnector),
185    #[cfg(unix)]
186    Unix(UnixConnector),
187}
188
189/// A connector for generically connecting to a remote network endpoint.
190///
191/// The connection can be based on TCP, UDP, or Unix Domain Sockets.
192#[derive(Clone)]
193pub struct NetworkConnector {
194    inner: ConnectorType,
195}
196
197impl NetworkConnector {
198    fn on_connected(&self, metadata: ConnectionMetadata) {
199        match metadata {
200            ConnectionMetadata::Tcp { peer_addr } => {
201                emit!(TcpSocketConnectionEstablished {
202                    peer_addr: Some(peer_addr)
203                });
204            }
205            #[cfg(unix)]
206            ConnectionMetadata::Unix { path } => {
207                emit!(UnixSocketConnectionEstablished { path: &path });
208            }
209        }
210    }
211
212    fn on_connection_error<E: std::error::Error>(&self, error: E) {
213        emit!(SocketOutgoingConnectionError { error });
214    }
215
216    async fn connect(&self) -> Result<(NetworkConnection, Option<ConnectionMetadata>), NetError> {
217        match &self.inner {
218            ConnectorType::Tcp(connector) => {
219                let (peer_addr, stream) = connector.connect().await?;
220
221                Ok((
222                    NetworkConnection::Tcp(stream),
223                    Some(ConnectionMetadata::Tcp { peer_addr }),
224                ))
225            }
226            ConnectorType::Udp(connector) => {
227                let socket = connector.connect().await?;
228
229                Ok((NetworkConnection::Udp(socket), None))
230            }
231            #[cfg(unix)]
232            ConnectorType::Unix(connector) => {
233                let (path, socket) = connector.connect().await?;
234
235                Ok((
236                    NetworkConnection::Unix(socket),
237                    Some(ConnectionMetadata::Unix { path }),
238                ))
239            }
240        }
241    }
242
243    async fn connect_backoff(&self) -> NetworkConnection {
244        // TODO: Make this configurable.
245        let mut backoff = ExponentialBackoff::from_millis(2)
246            .factor(250)
247            .max_delay(Duration::from_secs(60));
248
249        loop {
250            match self.connect().await {
251                Ok((connection, maybe_metadata)) => {
252                    if let Some(metadata) = maybe_metadata {
253                        self.on_connected(metadata);
254                    }
255
256                    return connection;
257                }
258                Err(error) => {
259                    self.on_connection_error(error);
260                    sleep(backoff.next().unwrap()).await;
261                }
262            }
263        }
264    }
265
266    /// Gets a `Healthcheck` based on the configured destination of this connector.
267    pub fn healthcheck(&self) -> Healthcheck {
268        let connector = self.clone();
269        Box::pin(async move { connector.connect().await.map(|_| ()).map_err(Into::into) })
270    }
271
272    /// Gets a `Service` suitable for sending data to the configured destination of this connector.
273    pub fn service(&self) -> NetworkService {
274        NetworkService::new(self.clone())
275    }
276}
277
278/// A `Service` implementation for generically sending bytes to a remote peer over a network connection.
279///
280/// The connection can be based on TCP, UDP, or Unix Domain Sockets.
281pub struct NetworkService {
282    connector: NetworkConnector,
283    state: NetworkServiceState,
284}
285
286impl NetworkService {
287    const fn new(connector: NetworkConnector) -> Self {
288        Self {
289            connector,
290            state: NetworkServiceState::Disconnected,
291        }
292    }
293}
294
295impl Service<Vec<u8>> for NetworkService {
296    type Response = usize;
297    type Error = NetError;
298    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
299
300    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
301        loop {
302            self.state = match &mut self.state {
303                NetworkServiceState::Disconnected => {
304                    let connector = self.connector.clone();
305                    NetworkServiceState::Connecting(Box::pin(async move {
306                        connector.connect_backoff().await
307                    }))
308                }
309                NetworkServiceState::Connecting(fut) => {
310                    let socket = ready!(fut.poll_unpin(cx));
311                    NetworkServiceState::Connected(socket)
312                }
313                NetworkServiceState::Connected(_) => break,
314                NetworkServiceState::Sending(fut) => {
315                    match ready!(fut.poll_unpin(cx)) {
316                        // When a send concludes, and there's an error, the request future sends
317                        // back `None`. Otherwise, it'll send back `Some(...)` with the socket.
318                        Ok(maybe_socket) => match maybe_socket {
319                            Some(socket) => NetworkServiceState::Connected(socket),
320                            None => NetworkServiceState::Disconnected,
321                        },
322                        Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)),
323                    }
324                }
325            };
326        }
327        Poll::Ready(Ok(()))
328    }
329
330    fn call(&mut self, buf: Vec<u8>) -> Self::Future {
331        let (tx, rx) = oneshot::channel();
332
333        let mut socket = match std::mem::replace(&mut self.state, NetworkServiceState::Sending(rx))
334        {
335            NetworkServiceState::Connected(socket) => socket,
336            _ => panic!("poll_ready must be called first"),
337        };
338
339        Box::pin(async move {
340            match socket.send(&buf).await.context(net_error::FailedToSend) {
341                Ok(sent) => {
342                    // Emit an error if we weren't able to send the entire buffer.
343                    if sent != buf.len() {
344                        socket.on_partial_send(buf.len(), sent);
345                    }
346
347                    // Send the socket back to the service, since theoretically it's still valid to
348                    // reuse given that we may have simply overrun the OS socket buffers, etc.
349                    let _ = tx.send(Some(socket));
350
351                    Ok(sent)
352                }
353                Err(e) => {
354                    // We need to signal back to the service that it needs to create a fresh socket
355                    // since this one could be tainted.
356                    let _ = tx.send(None);
357
358                    Err(e)
359                }
360            }
361        })
362    }
363}