vector/sinks/util/service/net/
mod.rs

1mod tcp;
2mod udp;
3
4#[cfg(unix)]
5mod unix;
6
7use std::{
8    io,
9    net::SocketAddr,
10    task::{Context, Poll, ready},
11};
12
13use futures_util::{FutureExt, future::BoxFuture};
14use snafu::{ResultExt, Snafu};
15use tokio::{
16    io::AsyncWriteExt,
17    net::{TcpStream, UdpSocket},
18    sync::oneshot,
19    time::sleep,
20};
21use tower::Service;
22use vector_lib::{
23    configurable::configurable_component,
24    tls::{MaybeTlsStream, TlsError},
25};
26#[cfg(unix)]
27use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};
28
29#[cfg(unix)]
30use self::unix::UnixConnector;
31#[cfg(unix)]
32pub use self::unix::{UnixConnectorConfig, UnixMode};
33use self::{tcp::TcpConnector, udp::UdpConnector};
34pub use self::{tcp::TcpConnectorConfig, udp::UdpConnectorConfig};
35#[cfg(unix)]
36use crate::internal_events::{UnixSendIncompleteError, UnixSocketConnectionEstablished};
37use crate::{
38    common::backoff::ExponentialBackoff,
39    internal_events::{
40        SocketOutgoingConnectionError, TcpSocketConnectionEstablished, UdpSendIncompleteError,
41    },
42    sinks::Healthcheck,
43};
44
45/// Hostname and port tuple.
46///
47/// Both IP addresses and hostnames/fully qualified domain names (FQDNs) are accepted formats.
48///
49/// The address _must_ include a port.
50#[configurable_component]
51#[derive(Clone, Debug)]
52#[serde(try_from = "String", into = "String")]
53#[configurable(title = "The address to connect to.")]
54#[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
55#[configurable(metadata(docs::examples = "somehost:5000"))]
56struct HostAndPort {
57    /// Hostname.
58    host: String,
59
60    /// Port.
61    port: u16,
62}
63
64impl TryFrom<String> for HostAndPort {
65    type Error = String;
66
67    fn try_from(value: String) -> Result<Self, Self::Error> {
68        let uri = value.parse::<http::Uri>().map_err(|e| e.to_string())?;
69        let host = uri
70            .host()
71            .ok_or_else(|| "missing host".to_string())?
72            .to_string();
73        let port = uri.port_u16().ok_or_else(|| "missing port".to_string())?;
74
75        Ok(Self { host, port })
76    }
77}
78
79impl From<HostAndPort> for String {
80    fn from(value: HostAndPort) -> Self {
81        format!("{}:{}", value.host, value.port)
82    }
83}
84
85#[derive(Debug, Snafu)]
86#[snafu(module, context(suffix(false)), visibility(pub))]
87pub enum NetError {
88    #[snafu(display("Address is invalid: {}", reason))]
89    InvalidAddress { reason: String },
90
91    #[snafu(display("Failed to resolve address: {}", source))]
92    FailedToResolve { source: crate::dns::DnsError },
93
94    #[snafu(display("No addresses returned."))]
95    NoAddresses,
96
97    #[snafu(display("Failed to configure socket: {}.", source))]
98    FailedToConfigure { source: std::io::Error },
99
100    #[snafu(display("Failed to configure TLS: {}.", source))]
101    FailedToConfigureTLS { source: TlsError },
102
103    #[snafu(display("Failed to bind socket: {}.", source))]
104    FailedToBind { source: std::io::Error },
105
106    #[snafu(display("Failed to send message: {}", source))]
107    FailedToSend { source: std::io::Error },
108
109    #[snafu(display("Failed to connect to endpoint: {}", source))]
110    FailedToConnect { source: std::io::Error },
111
112    #[snafu(display("Failed to connect to TLS endpoint: {}", source))]
113    FailedToConnectTLS { source: TlsError },
114
115    #[snafu(display("Failed to get socket back after send as channel closed unexpectedly."))]
116    ServiceSocketChannelClosed,
117}
118
119enum NetworkServiceState {
120    /// The service is currently disconnected.
121    Disconnected,
122
123    /// The service is currently attempting to connect to the endpoint.
124    Connecting(BoxFuture<'static, NetworkConnection>),
125
126    /// The service is connected and idle.
127    Connected(NetworkConnection),
128
129    /// The service has an in-flight send to the socket.
130    ///
131    /// If the socket experiences an unrecoverable error during the send, `None` will be returned
132    /// over the channel to signal the need to establish a new connection rather than reusing the
133    /// existing connection.
134    Sending(oneshot::Receiver<Option<NetworkConnection>>),
135}
136
137enum NetworkConnection {
138    Tcp(MaybeTlsStream<TcpStream>),
139    Udp(UdpSocket),
140    #[cfg(unix)]
141    Unix(UnixEither),
142}
143
144impl NetworkConnection {
145    fn on_partial_send(&self, data_size: usize, sent: usize) {
146        match self {
147            // Can't "successfully" partially send with TCP: it either all eventually sends or the
148            // socket has an I/O error that kills the connection entirely.
149            Self::Tcp(_) => {}
150            Self::Udp(_) => {
151                emit!(UdpSendIncompleteError { data_size, sent });
152            }
153            #[cfg(unix)]
154            Self::Unix(_) => {
155                emit!(UnixSendIncompleteError { data_size, sent });
156            }
157        }
158    }
159
160    async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
161        match self {
162            Self::Tcp(stream) => stream.write_all(buf).await.map(|()| buf.len()),
163            Self::Udp(socket) => socket.send(buf).await,
164            #[cfg(unix)]
165            Self::Unix(socket) => socket.send(buf).await,
166        }
167    }
168}
169
170enum ConnectionMetadata {
171    Tcp {
172        peer_addr: SocketAddr,
173    },
174    #[cfg(unix)]
175    Unix {
176        path: PathBuf,
177    },
178}
179
180#[derive(Clone)]
181enum ConnectorType {
182    Tcp(TcpConnector),
183    Udp(UdpConnector),
184    #[cfg(unix)]
185    Unix(UnixConnector),
186}
187
188/// A connector for generically connecting to a remote network endpoint.
189///
190/// The connection can be based on TCP, UDP, or Unix Domain Sockets.
191#[derive(Clone)]
192pub struct NetworkConnector {
193    inner: ConnectorType,
194}
195
196impl NetworkConnector {
197    fn on_connected(&self, metadata: ConnectionMetadata) {
198        match metadata {
199            ConnectionMetadata::Tcp { peer_addr } => {
200                emit!(TcpSocketConnectionEstablished {
201                    peer_addr: Some(peer_addr)
202                });
203            }
204            #[cfg(unix)]
205            ConnectionMetadata::Unix { path } => {
206                emit!(UnixSocketConnectionEstablished { path: &path });
207            }
208        }
209    }
210
211    fn on_connection_error<E: std::error::Error>(&self, error: E) {
212        emit!(SocketOutgoingConnectionError { error });
213    }
214
215    async fn connect(&self) -> Result<(NetworkConnection, Option<ConnectionMetadata>), NetError> {
216        match &self.inner {
217            ConnectorType::Tcp(connector) => {
218                let (peer_addr, stream) = connector.connect().await?;
219
220                Ok((
221                    NetworkConnection::Tcp(stream),
222                    Some(ConnectionMetadata::Tcp { peer_addr }),
223                ))
224            }
225            ConnectorType::Udp(connector) => {
226                let socket = connector.connect().await?;
227
228                Ok((NetworkConnection::Udp(socket), None))
229            }
230            #[cfg(unix)]
231            ConnectorType::Unix(connector) => {
232                let (path, socket) = connector.connect().await?;
233
234                Ok((
235                    NetworkConnection::Unix(socket),
236                    Some(ConnectionMetadata::Unix { path }),
237                ))
238            }
239        }
240    }
241
242    async fn connect_backoff(&self) -> NetworkConnection {
243        // TODO: Make this configurable.
244        let mut backoff = ExponentialBackoff::default();
245
246        loop {
247            match self.connect().await {
248                Ok((connection, maybe_metadata)) => {
249                    if let Some(metadata) = maybe_metadata {
250                        self.on_connected(metadata);
251                    }
252
253                    return connection;
254                }
255                Err(error) => {
256                    self.on_connection_error(error);
257                    sleep(backoff.next().unwrap()).await;
258                }
259            }
260        }
261    }
262
263    /// Gets a `Healthcheck` based on the configured destination of this connector.
264    pub fn healthcheck(&self) -> Healthcheck {
265        let connector = self.clone();
266        Box::pin(async move { connector.connect().await.map(|_| ()).map_err(Into::into) })
267    }
268
269    /// Gets a `Service` suitable for sending data to the configured destination of this connector.
270    pub fn service(&self) -> NetworkService {
271        NetworkService::new(self.clone())
272    }
273}
274
275/// A `Service` implementation for generically sending bytes to a remote peer over a network connection.
276///
277/// The connection can be based on TCP, UDP, or Unix Domain Sockets.
278pub struct NetworkService {
279    connector: NetworkConnector,
280    state: NetworkServiceState,
281}
282
283impl NetworkService {
284    const fn new(connector: NetworkConnector) -> Self {
285        Self {
286            connector,
287            state: NetworkServiceState::Disconnected,
288        }
289    }
290}
291
292impl Service<Vec<u8>> for NetworkService {
293    type Response = usize;
294    type Error = NetError;
295    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
296
297    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
298        loop {
299            self.state = match &mut self.state {
300                NetworkServiceState::Disconnected => {
301                    let connector = self.connector.clone();
302                    NetworkServiceState::Connecting(Box::pin(async move {
303                        connector.connect_backoff().await
304                    }))
305                }
306                NetworkServiceState::Connecting(fut) => {
307                    let socket = ready!(fut.poll_unpin(cx));
308                    NetworkServiceState::Connected(socket)
309                }
310                NetworkServiceState::Connected(_) => break,
311                NetworkServiceState::Sending(fut) => {
312                    match ready!(fut.poll_unpin(cx)) {
313                        // When a send concludes, and there's an error, the request future sends
314                        // back `None`. Otherwise, it'll send back `Some(...)` with the socket.
315                        Ok(maybe_socket) => match maybe_socket {
316                            Some(socket) => NetworkServiceState::Connected(socket),
317                            None => NetworkServiceState::Disconnected,
318                        },
319                        Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)),
320                    }
321                }
322            };
323        }
324        Poll::Ready(Ok(()))
325    }
326
327    fn call(&mut self, buf: Vec<u8>) -> Self::Future {
328        let (tx, rx) = oneshot::channel();
329
330        let mut socket = match std::mem::replace(&mut self.state, NetworkServiceState::Sending(rx))
331        {
332            NetworkServiceState::Connected(socket) => socket,
333            _ => panic!("poll_ready must be called first"),
334        };
335
336        Box::pin(async move {
337            match socket.send(&buf).await.context(net_error::FailedToSend) {
338                Ok(sent) => {
339                    // Emit an error if we weren't able to send the entire buffer.
340                    if sent != buf.len() {
341                        socket.on_partial_send(buf.len(), sent);
342                    }
343
344                    // Send the socket back to the service, since theoretically it's still valid to
345                    // reuse given that we may have simply overrun the OS socket buffers, etc.
346                    let _ = tx.send(Some(socket));
347
348                    Ok(sent)
349                }
350                Err(e) => {
351                    // We need to signal back to the service that it needs to create a fresh socket
352                    // since this one could be tainted.
353                    let _ = tx.send(None);
354
355                    Err(e)
356                }
357            }
358        })
359    }
360}