vector/sinks/util/service/net/
mod.rs

1mod tcp;
2mod udp;
3
4#[cfg(unix)]
5mod unix;
6
7use std::{
8    io,
9    net::SocketAddr,
10    task::{ready, Context, Poll},
11    time::Duration,
12};
13
14#[cfg(unix)]
15use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};
16
17use crate::{
18    common::backoff::ExponentialBackoff,
19    internal_events::{
20        SocketOutgoingConnectionError, TcpSocketConnectionEstablished, UdpSendIncompleteError,
21    },
22    sinks::Healthcheck,
23};
24
25#[cfg(unix)]
26use crate::internal_events::{UnixSendIncompleteError, UnixSocketConnectionEstablished};
27
28pub use self::tcp::TcpConnectorConfig;
29pub use self::udp::UdpConnectorConfig;
30
31#[cfg(unix)]
32pub use self::unix::{UnixConnectorConfig, UnixMode};
33
34use self::tcp::TcpConnector;
35use self::udp::UdpConnector;
36#[cfg(unix)]
37use self::unix::UnixConnector;
38
39use futures_util::{future::BoxFuture, FutureExt};
40use snafu::{ResultExt, Snafu};
41use tokio::{
42    io::AsyncWriteExt,
43    net::{TcpStream, UdpSocket},
44    sync::oneshot,
45    time::sleep,
46};
47use tower::Service;
48use vector_lib::configurable::configurable_component;
49use vector_lib::tls::{MaybeTlsStream, TlsError};
50
51/// Hostname and port tuple.
52///
53/// Both IP addresses and hostnames/fully qualified domain names (FQDNs) are accepted formats.
54///
55/// The address _must_ include a port.
56#[configurable_component]
57#[derive(Clone, Debug)]
58#[serde(try_from = "String", into = "String")]
59#[configurable(title = "The address to connect to.")]
60#[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
61#[configurable(metadata(docs::examples = "somehost:5000"))]
62struct HostAndPort {
63    /// Hostname.
64    host: String,
65
66    /// Port.
67    port: u16,
68}
69
70impl TryFrom<String> for HostAndPort {
71    type Error = String;
72
73    fn try_from(value: String) -> Result<Self, Self::Error> {
74        let uri = value.parse::<http::Uri>().map_err(|e| e.to_string())?;
75        let host = uri
76            .host()
77            .ok_or_else(|| "missing host".to_string())?
78            .to_string();
79        let port = uri.port_u16().ok_or_else(|| "missing port".to_string())?;
80
81        Ok(Self { host, port })
82    }
83}
84
85impl From<HostAndPort> for String {
86    fn from(value: HostAndPort) -> Self {
87        format!("{}:{}", value.host, value.port)
88    }
89}
90
91#[derive(Debug, Snafu)]
92#[snafu(module, context(suffix(false)), visibility(pub))]
93pub enum NetError {
94    #[snafu(display("Address is invalid: {}", reason))]
95    InvalidAddress { reason: String },
96
97    #[snafu(display("Failed to resolve address: {}", source))]
98    FailedToResolve { source: crate::dns::DnsError },
99
100    #[snafu(display("No addresses returned."))]
101    NoAddresses,
102
103    #[snafu(display("Failed to configure socket: {}.", source))]
104    FailedToConfigure { source: std::io::Error },
105
106    #[snafu(display("Failed to configure TLS: {}.", source))]
107    FailedToConfigureTLS { source: TlsError },
108
109    #[snafu(display("Failed to bind socket: {}.", source))]
110    FailedToBind { source: std::io::Error },
111
112    #[snafu(display("Failed to send message: {}", source))]
113    FailedToSend { source: std::io::Error },
114
115    #[snafu(display("Failed to connect to endpoint: {}", source))]
116    FailedToConnect { source: std::io::Error },
117
118    #[snafu(display("Failed to connect to TLS endpoint: {}", source))]
119    FailedToConnectTLS { source: TlsError },
120
121    #[snafu(display("Failed to get socket back after send as channel closed unexpectedly."))]
122    ServiceSocketChannelClosed,
123}
124
125enum NetworkServiceState {
126    /// The service is currently disconnected.
127    Disconnected,
128
129    /// The service is currently attempting to connect to the endpoint.
130    Connecting(BoxFuture<'static, NetworkConnection>),
131
132    /// The service is connected and idle.
133    Connected(NetworkConnection),
134
135    /// The service has an in-flight send to the socket.
136    ///
137    /// If the socket experiences an unrecoverable error during the send, `None` will be returned
138    /// over the channel to signal the need to establish a new connection rather than reusing the
139    /// existing connection.
140    Sending(oneshot::Receiver<Option<NetworkConnection>>),
141}
142
143enum NetworkConnection {
144    Tcp(MaybeTlsStream<TcpStream>),
145    Udp(UdpSocket),
146    #[cfg(unix)]
147    Unix(UnixEither),
148}
149
150impl NetworkConnection {
151    fn on_partial_send(&self, data_size: usize, sent: usize) {
152        match self {
153            // Can't "successfully" partially send with TCP: it either all eventually sends or the
154            // socket has an I/O error that kills the connection entirely.
155            Self::Tcp(_) => {}
156            Self::Udp(_) => {
157                emit!(UdpSendIncompleteError { data_size, sent });
158            }
159            #[cfg(unix)]
160            Self::Unix(_) => {
161                emit!(UnixSendIncompleteError { data_size, sent });
162            }
163        }
164    }
165
166    async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
167        match self {
168            Self::Tcp(stream) => stream.write_all(buf).await.map(|()| buf.len()),
169            Self::Udp(socket) => socket.send(buf).await,
170            #[cfg(unix)]
171            Self::Unix(socket) => socket.send(buf).await,
172        }
173    }
174}
175
176enum ConnectionMetadata {
177    Tcp {
178        peer_addr: SocketAddr,
179    },
180    #[cfg(unix)]
181    Unix {
182        path: PathBuf,
183    },
184}
185
186#[derive(Clone)]
187enum ConnectorType {
188    Tcp(TcpConnector),
189    Udp(UdpConnector),
190    #[cfg(unix)]
191    Unix(UnixConnector),
192}
193
194/// A connector for generically connecting to a remote network endpoint.
195///
196/// The connection can be based on TCP, UDP, or Unix Domain Sockets.
197#[derive(Clone)]
198pub struct NetworkConnector {
199    inner: ConnectorType,
200}
201
202impl NetworkConnector {
203    fn on_connected(&self, metadata: ConnectionMetadata) {
204        match metadata {
205            ConnectionMetadata::Tcp { peer_addr } => {
206                emit!(TcpSocketConnectionEstablished {
207                    peer_addr: Some(peer_addr)
208                });
209            }
210            #[cfg(unix)]
211            ConnectionMetadata::Unix { path } => {
212                emit!(UnixSocketConnectionEstablished { path: &path });
213            }
214        }
215    }
216
217    fn on_connection_error<E: std::error::Error>(&self, error: E) {
218        emit!(SocketOutgoingConnectionError { error });
219    }
220
221    async fn connect(&self) -> Result<(NetworkConnection, Option<ConnectionMetadata>), NetError> {
222        match &self.inner {
223            ConnectorType::Tcp(connector) => {
224                let (peer_addr, stream) = connector.connect().await?;
225
226                Ok((
227                    NetworkConnection::Tcp(stream),
228                    Some(ConnectionMetadata::Tcp { peer_addr }),
229                ))
230            }
231            ConnectorType::Udp(connector) => {
232                let socket = connector.connect().await?;
233
234                Ok((NetworkConnection::Udp(socket), None))
235            }
236            #[cfg(unix)]
237            ConnectorType::Unix(connector) => {
238                let (path, socket) = connector.connect().await?;
239
240                Ok((
241                    NetworkConnection::Unix(socket),
242                    Some(ConnectionMetadata::Unix { path }),
243                ))
244            }
245        }
246    }
247
248    async fn connect_backoff(&self) -> NetworkConnection {
249        // TODO: Make this configurable.
250        let mut backoff = ExponentialBackoff::from_millis(2)
251            .factor(250)
252            .max_delay(Duration::from_secs(60));
253
254        loop {
255            match self.connect().await {
256                Ok((connection, maybe_metadata)) => {
257                    if let Some(metadata) = maybe_metadata {
258                        self.on_connected(metadata);
259                    }
260
261                    return connection;
262                }
263                Err(error) => {
264                    self.on_connection_error(error);
265                    sleep(backoff.next().unwrap()).await;
266                }
267            }
268        }
269    }
270
271    /// Gets a `Healthcheck` based on the configured destination of this connector.
272    pub fn healthcheck(&self) -> Healthcheck {
273        let connector = self.clone();
274        Box::pin(async move { connector.connect().await.map(|_| ()).map_err(Into::into) })
275    }
276
277    /// Gets a `Service` suitable for sending data to the configured destination of this connector.
278    pub fn service(&self) -> NetworkService {
279        NetworkService::new(self.clone())
280    }
281}
282
283/// A `Service` implementation for generically sending bytes to a remote peer over a network connection.
284///
285/// The connection can be based on TCP, UDP, or Unix Domain Sockets.
286pub struct NetworkService {
287    connector: NetworkConnector,
288    state: NetworkServiceState,
289}
290
291impl NetworkService {
292    const fn new(connector: NetworkConnector) -> Self {
293        Self {
294            connector,
295            state: NetworkServiceState::Disconnected,
296        }
297    }
298}
299
300impl Service<Vec<u8>> for NetworkService {
301    type Response = usize;
302    type Error = NetError;
303    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
304
305    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
306        loop {
307            self.state = match &mut self.state {
308                NetworkServiceState::Disconnected => {
309                    let connector = self.connector.clone();
310                    NetworkServiceState::Connecting(Box::pin(async move {
311                        connector.connect_backoff().await
312                    }))
313                }
314                NetworkServiceState::Connecting(fut) => {
315                    let socket = ready!(fut.poll_unpin(cx));
316                    NetworkServiceState::Connected(socket)
317                }
318                NetworkServiceState::Connected(_) => break,
319                NetworkServiceState::Sending(fut) => {
320                    match ready!(fut.poll_unpin(cx)) {
321                        // When a send concludes, and there's an error, the request future sends
322                        // back `None`. Otherwise, it'll send back `Some(...)` with the socket.
323                        Ok(maybe_socket) => match maybe_socket {
324                            Some(socket) => NetworkServiceState::Connected(socket),
325                            None => NetworkServiceState::Disconnected,
326                        },
327                        Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)),
328                    }
329                }
330            };
331        }
332        Poll::Ready(Ok(()))
333    }
334
335    fn call(&mut self, buf: Vec<u8>) -> Self::Future {
336        let (tx, rx) = oneshot::channel();
337
338        let mut socket = match std::mem::replace(&mut self.state, NetworkServiceState::Sending(rx))
339        {
340            NetworkServiceState::Connected(socket) => socket,
341            _ => panic!("poll_ready must be called first"),
342        };
343
344        Box::pin(async move {
345            match socket.send(&buf).await.context(net_error::FailedToSend) {
346                Ok(sent) => {
347                    // Emit an error if we weren't able to send the entire buffer.
348                    if sent != buf.len() {
349                        socket.on_partial_send(buf.len(), sent);
350                    }
351
352                    // Send the socket back to the service, since theoretically it's still valid to
353                    // reuse given that we may have simply overrun the OS socket buffers, etc.
354                    let _ = tx.send(Some(socket));
355
356                    Ok(sent)
357                }
358                Err(e) => {
359                    // We need to signal back to the service that it needs to create a fresh socket
360                    // since this one could be tainted.
361                    let _ = tx.send(None);
362
363                    Err(e)
364                }
365            }
366        })
367    }
368}