vector/sinks/util/service/net/
mod.rs1mod tcp;
2mod udp;
3
4#[cfg(unix)]
5mod unix;
6
7use std::{
8 io,
9 net::SocketAddr,
10 task::{ready, Context, Poll},
11 time::Duration,
12};
13
14#[cfg(unix)]
15use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};
16
17use crate::{
18 common::backoff::ExponentialBackoff,
19 internal_events::{
20 SocketOutgoingConnectionError, TcpSocketConnectionEstablished, UdpSendIncompleteError,
21 },
22 sinks::Healthcheck,
23};
24
25#[cfg(unix)]
26use crate::internal_events::{UnixSendIncompleteError, UnixSocketConnectionEstablished};
27
28pub use self::tcp::TcpConnectorConfig;
29pub use self::udp::UdpConnectorConfig;
30
31#[cfg(unix)]
32pub use self::unix::{UnixConnectorConfig, UnixMode};
33
34use self::tcp::TcpConnector;
35use self::udp::UdpConnector;
36#[cfg(unix)]
37use self::unix::UnixConnector;
38
39use futures_util::{future::BoxFuture, FutureExt};
40use snafu::{ResultExt, Snafu};
41use tokio::{
42 io::AsyncWriteExt,
43 net::{TcpStream, UdpSocket},
44 sync::oneshot,
45 time::sleep,
46};
47use tower::Service;
48use vector_lib::configurable::configurable_component;
49use vector_lib::tls::{MaybeTlsStream, TlsError};
50
51#[configurable_component]
57#[derive(Clone, Debug)]
58#[serde(try_from = "String", into = "String")]
59#[configurable(title = "The address to connect to.")]
60#[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
61#[configurable(metadata(docs::examples = "somehost:5000"))]
62struct HostAndPort {
63 host: String,
65
66 port: u16,
68}
69
70impl TryFrom<String> for HostAndPort {
71 type Error = String;
72
73 fn try_from(value: String) -> Result<Self, Self::Error> {
74 let uri = value.parse::<http::Uri>().map_err(|e| e.to_string())?;
75 let host = uri
76 .host()
77 .ok_or_else(|| "missing host".to_string())?
78 .to_string();
79 let port = uri.port_u16().ok_or_else(|| "missing port".to_string())?;
80
81 Ok(Self { host, port })
82 }
83}
84
85impl From<HostAndPort> for String {
86 fn from(value: HostAndPort) -> Self {
87 format!("{}:{}", value.host, value.port)
88 }
89}
90
91#[derive(Debug, Snafu)]
92#[snafu(module, context(suffix(false)), visibility(pub))]
93pub enum NetError {
94 #[snafu(display("Address is invalid: {}", reason))]
95 InvalidAddress { reason: String },
96
97 #[snafu(display("Failed to resolve address: {}", source))]
98 FailedToResolve { source: crate::dns::DnsError },
99
100 #[snafu(display("No addresses returned."))]
101 NoAddresses,
102
103 #[snafu(display("Failed to configure socket: {}.", source))]
104 FailedToConfigure { source: std::io::Error },
105
106 #[snafu(display("Failed to configure TLS: {}.", source))]
107 FailedToConfigureTLS { source: TlsError },
108
109 #[snafu(display("Failed to bind socket: {}.", source))]
110 FailedToBind { source: std::io::Error },
111
112 #[snafu(display("Failed to send message: {}", source))]
113 FailedToSend { source: std::io::Error },
114
115 #[snafu(display("Failed to connect to endpoint: {}", source))]
116 FailedToConnect { source: std::io::Error },
117
118 #[snafu(display("Failed to connect to TLS endpoint: {}", source))]
119 FailedToConnectTLS { source: TlsError },
120
121 #[snafu(display("Failed to get socket back after send as channel closed unexpectedly."))]
122 ServiceSocketChannelClosed,
123}
124
125enum NetworkServiceState {
126 Disconnected,
128
129 Connecting(BoxFuture<'static, NetworkConnection>),
131
132 Connected(NetworkConnection),
134
135 Sending(oneshot::Receiver<Option<NetworkConnection>>),
141}
142
143enum NetworkConnection {
144 Tcp(MaybeTlsStream<TcpStream>),
145 Udp(UdpSocket),
146 #[cfg(unix)]
147 Unix(UnixEither),
148}
149
150impl NetworkConnection {
151 fn on_partial_send(&self, data_size: usize, sent: usize) {
152 match self {
153 Self::Tcp(_) => {}
156 Self::Udp(_) => {
157 emit!(UdpSendIncompleteError { data_size, sent });
158 }
159 #[cfg(unix)]
160 Self::Unix(_) => {
161 emit!(UnixSendIncompleteError { data_size, sent });
162 }
163 }
164 }
165
166 async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
167 match self {
168 Self::Tcp(stream) => stream.write_all(buf).await.map(|()| buf.len()),
169 Self::Udp(socket) => socket.send(buf).await,
170 #[cfg(unix)]
171 Self::Unix(socket) => socket.send(buf).await,
172 }
173 }
174}
175
176enum ConnectionMetadata {
177 Tcp {
178 peer_addr: SocketAddr,
179 },
180 #[cfg(unix)]
181 Unix {
182 path: PathBuf,
183 },
184}
185
186#[derive(Clone)]
187enum ConnectorType {
188 Tcp(TcpConnector),
189 Udp(UdpConnector),
190 #[cfg(unix)]
191 Unix(UnixConnector),
192}
193
194#[derive(Clone)]
198pub struct NetworkConnector {
199 inner: ConnectorType,
200}
201
202impl NetworkConnector {
203 fn on_connected(&self, metadata: ConnectionMetadata) {
204 match metadata {
205 ConnectionMetadata::Tcp { peer_addr } => {
206 emit!(TcpSocketConnectionEstablished {
207 peer_addr: Some(peer_addr)
208 });
209 }
210 #[cfg(unix)]
211 ConnectionMetadata::Unix { path } => {
212 emit!(UnixSocketConnectionEstablished { path: &path });
213 }
214 }
215 }
216
217 fn on_connection_error<E: std::error::Error>(&self, error: E) {
218 emit!(SocketOutgoingConnectionError { error });
219 }
220
221 async fn connect(&self) -> Result<(NetworkConnection, Option<ConnectionMetadata>), NetError> {
222 match &self.inner {
223 ConnectorType::Tcp(connector) => {
224 let (peer_addr, stream) = connector.connect().await?;
225
226 Ok((
227 NetworkConnection::Tcp(stream),
228 Some(ConnectionMetadata::Tcp { peer_addr }),
229 ))
230 }
231 ConnectorType::Udp(connector) => {
232 let socket = connector.connect().await?;
233
234 Ok((NetworkConnection::Udp(socket), None))
235 }
236 #[cfg(unix)]
237 ConnectorType::Unix(connector) => {
238 let (path, socket) = connector.connect().await?;
239
240 Ok((
241 NetworkConnection::Unix(socket),
242 Some(ConnectionMetadata::Unix { path }),
243 ))
244 }
245 }
246 }
247
248 async fn connect_backoff(&self) -> NetworkConnection {
249 let mut backoff = ExponentialBackoff::from_millis(2)
251 .factor(250)
252 .max_delay(Duration::from_secs(60));
253
254 loop {
255 match self.connect().await {
256 Ok((connection, maybe_metadata)) => {
257 if let Some(metadata) = maybe_metadata {
258 self.on_connected(metadata);
259 }
260
261 return connection;
262 }
263 Err(error) => {
264 self.on_connection_error(error);
265 sleep(backoff.next().unwrap()).await;
266 }
267 }
268 }
269 }
270
271 pub fn healthcheck(&self) -> Healthcheck {
273 let connector = self.clone();
274 Box::pin(async move { connector.connect().await.map(|_| ()).map_err(Into::into) })
275 }
276
277 pub fn service(&self) -> NetworkService {
279 NetworkService::new(self.clone())
280 }
281}
282
283pub struct NetworkService {
287 connector: NetworkConnector,
288 state: NetworkServiceState,
289}
290
291impl NetworkService {
292 const fn new(connector: NetworkConnector) -> Self {
293 Self {
294 connector,
295 state: NetworkServiceState::Disconnected,
296 }
297 }
298}
299
300impl Service<Vec<u8>> for NetworkService {
301 type Response = usize;
302 type Error = NetError;
303 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
304
305 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
306 loop {
307 self.state = match &mut self.state {
308 NetworkServiceState::Disconnected => {
309 let connector = self.connector.clone();
310 NetworkServiceState::Connecting(Box::pin(async move {
311 connector.connect_backoff().await
312 }))
313 }
314 NetworkServiceState::Connecting(fut) => {
315 let socket = ready!(fut.poll_unpin(cx));
316 NetworkServiceState::Connected(socket)
317 }
318 NetworkServiceState::Connected(_) => break,
319 NetworkServiceState::Sending(fut) => {
320 match ready!(fut.poll_unpin(cx)) {
321 Ok(maybe_socket) => match maybe_socket {
324 Some(socket) => NetworkServiceState::Connected(socket),
325 None => NetworkServiceState::Disconnected,
326 },
327 Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)),
328 }
329 }
330 };
331 }
332 Poll::Ready(Ok(()))
333 }
334
335 fn call(&mut self, buf: Vec<u8>) -> Self::Future {
336 let (tx, rx) = oneshot::channel();
337
338 let mut socket = match std::mem::replace(&mut self.state, NetworkServiceState::Sending(rx))
339 {
340 NetworkServiceState::Connected(socket) => socket,
341 _ => panic!("poll_ready must be called first"),
342 };
343
344 Box::pin(async move {
345 match socket.send(&buf).await.context(net_error::FailedToSend) {
346 Ok(sent) => {
347 if sent != buf.len() {
349 socket.on_partial_send(buf.len(), sent);
350 }
351
352 let _ = tx.send(Some(socket));
355
356 Ok(sent)
357 }
358 Err(e) => {
359 let _ = tx.send(None);
362
363 Err(e)
364 }
365 }
366 })
367 }
368}