vector/sinks/util/service/net/
mod.rs1mod tcp;
2mod udp;
3
4#[cfg(unix)]
5mod unix;
6
7use std::{
8 io,
9 net::SocketAddr,
10 task::{Context, Poll, ready},
11 time::Duration,
12};
13
14use futures_util::{FutureExt, future::BoxFuture};
15use snafu::{ResultExt, Snafu};
16use tokio::{
17 io::AsyncWriteExt,
18 net::{TcpStream, UdpSocket},
19 sync::oneshot,
20 time::sleep,
21};
22use tower::Service;
23use vector_lib::{
24 configurable::configurable_component,
25 tls::{MaybeTlsStream, TlsError},
26};
27#[cfg(unix)]
28use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};
29
30#[cfg(unix)]
31use self::unix::UnixConnector;
32#[cfg(unix)]
33pub use self::unix::{UnixConnectorConfig, UnixMode};
34use self::{tcp::TcpConnector, udp::UdpConnector};
35pub use self::{tcp::TcpConnectorConfig, udp::UdpConnectorConfig};
36#[cfg(unix)]
37use crate::internal_events::{UnixSendIncompleteError, UnixSocketConnectionEstablished};
38use crate::{
39 common::backoff::ExponentialBackoff,
40 internal_events::{
41 SocketOutgoingConnectionError, TcpSocketConnectionEstablished, UdpSendIncompleteError,
42 },
43 sinks::Healthcheck,
44};
45
46#[configurable_component]
52#[derive(Clone, Debug)]
53#[serde(try_from = "String", into = "String")]
54#[configurable(title = "The address to connect to.")]
55#[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
56#[configurable(metadata(docs::examples = "somehost:5000"))]
57struct HostAndPort {
58 host: String,
60
61 port: u16,
63}
64
65impl TryFrom<String> for HostAndPort {
66 type Error = String;
67
68 fn try_from(value: String) -> Result<Self, Self::Error> {
69 let uri = value.parse::<http::Uri>().map_err(|e| e.to_string())?;
70 let host = uri
71 .host()
72 .ok_or_else(|| "missing host".to_string())?
73 .to_string();
74 let port = uri.port_u16().ok_or_else(|| "missing port".to_string())?;
75
76 Ok(Self { host, port })
77 }
78}
79
80impl From<HostAndPort> for String {
81 fn from(value: HostAndPort) -> Self {
82 format!("{}:{}", value.host, value.port)
83 }
84}
85
86#[derive(Debug, Snafu)]
87#[snafu(module, context(suffix(false)), visibility(pub))]
88pub enum NetError {
89 #[snafu(display("Address is invalid: {}", reason))]
90 InvalidAddress { reason: String },
91
92 #[snafu(display("Failed to resolve address: {}", source))]
93 FailedToResolve { source: crate::dns::DnsError },
94
95 #[snafu(display("No addresses returned."))]
96 NoAddresses,
97
98 #[snafu(display("Failed to configure socket: {}.", source))]
99 FailedToConfigure { source: std::io::Error },
100
101 #[snafu(display("Failed to configure TLS: {}.", source))]
102 FailedToConfigureTLS { source: TlsError },
103
104 #[snafu(display("Failed to bind socket: {}.", source))]
105 FailedToBind { source: std::io::Error },
106
107 #[snafu(display("Failed to send message: {}", source))]
108 FailedToSend { source: std::io::Error },
109
110 #[snafu(display("Failed to connect to endpoint: {}", source))]
111 FailedToConnect { source: std::io::Error },
112
113 #[snafu(display("Failed to connect to TLS endpoint: {}", source))]
114 FailedToConnectTLS { source: TlsError },
115
116 #[snafu(display("Failed to get socket back after send as channel closed unexpectedly."))]
117 ServiceSocketChannelClosed,
118}
119
120enum NetworkServiceState {
121 Disconnected,
123
124 Connecting(BoxFuture<'static, NetworkConnection>),
126
127 Connected(NetworkConnection),
129
130 Sending(oneshot::Receiver<Option<NetworkConnection>>),
136}
137
138enum NetworkConnection {
139 Tcp(MaybeTlsStream<TcpStream>),
140 Udp(UdpSocket),
141 #[cfg(unix)]
142 Unix(UnixEither),
143}
144
145impl NetworkConnection {
146 fn on_partial_send(&self, data_size: usize, sent: usize) {
147 match self {
148 Self::Tcp(_) => {}
151 Self::Udp(_) => {
152 emit!(UdpSendIncompleteError { data_size, sent });
153 }
154 #[cfg(unix)]
155 Self::Unix(_) => {
156 emit!(UnixSendIncompleteError { data_size, sent });
157 }
158 }
159 }
160
161 async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
162 match self {
163 Self::Tcp(stream) => stream.write_all(buf).await.map(|()| buf.len()),
164 Self::Udp(socket) => socket.send(buf).await,
165 #[cfg(unix)]
166 Self::Unix(socket) => socket.send(buf).await,
167 }
168 }
169}
170
171enum ConnectionMetadata {
172 Tcp {
173 peer_addr: SocketAddr,
174 },
175 #[cfg(unix)]
176 Unix {
177 path: PathBuf,
178 },
179}
180
181#[derive(Clone)]
182enum ConnectorType {
183 Tcp(TcpConnector),
184 Udp(UdpConnector),
185 #[cfg(unix)]
186 Unix(UnixConnector),
187}
188
189#[derive(Clone)]
193pub struct NetworkConnector {
194 inner: ConnectorType,
195}
196
197impl NetworkConnector {
198 fn on_connected(&self, metadata: ConnectionMetadata) {
199 match metadata {
200 ConnectionMetadata::Tcp { peer_addr } => {
201 emit!(TcpSocketConnectionEstablished {
202 peer_addr: Some(peer_addr)
203 });
204 }
205 #[cfg(unix)]
206 ConnectionMetadata::Unix { path } => {
207 emit!(UnixSocketConnectionEstablished { path: &path });
208 }
209 }
210 }
211
212 fn on_connection_error<E: std::error::Error>(&self, error: E) {
213 emit!(SocketOutgoingConnectionError { error });
214 }
215
216 async fn connect(&self) -> Result<(NetworkConnection, Option<ConnectionMetadata>), NetError> {
217 match &self.inner {
218 ConnectorType::Tcp(connector) => {
219 let (peer_addr, stream) = connector.connect().await?;
220
221 Ok((
222 NetworkConnection::Tcp(stream),
223 Some(ConnectionMetadata::Tcp { peer_addr }),
224 ))
225 }
226 ConnectorType::Udp(connector) => {
227 let socket = connector.connect().await?;
228
229 Ok((NetworkConnection::Udp(socket), None))
230 }
231 #[cfg(unix)]
232 ConnectorType::Unix(connector) => {
233 let (path, socket) = connector.connect().await?;
234
235 Ok((
236 NetworkConnection::Unix(socket),
237 Some(ConnectionMetadata::Unix { path }),
238 ))
239 }
240 }
241 }
242
243 async fn connect_backoff(&self) -> NetworkConnection {
244 let mut backoff = ExponentialBackoff::from_millis(2)
246 .factor(250)
247 .max_delay(Duration::from_secs(60));
248
249 loop {
250 match self.connect().await {
251 Ok((connection, maybe_metadata)) => {
252 if let Some(metadata) = maybe_metadata {
253 self.on_connected(metadata);
254 }
255
256 return connection;
257 }
258 Err(error) => {
259 self.on_connection_error(error);
260 sleep(backoff.next().unwrap()).await;
261 }
262 }
263 }
264 }
265
266 pub fn healthcheck(&self) -> Healthcheck {
268 let connector = self.clone();
269 Box::pin(async move { connector.connect().await.map(|_| ()).map_err(Into::into) })
270 }
271
272 pub fn service(&self) -> NetworkService {
274 NetworkService::new(self.clone())
275 }
276}
277
278pub struct NetworkService {
282 connector: NetworkConnector,
283 state: NetworkServiceState,
284}
285
286impl NetworkService {
287 const fn new(connector: NetworkConnector) -> Self {
288 Self {
289 connector,
290 state: NetworkServiceState::Disconnected,
291 }
292 }
293}
294
295impl Service<Vec<u8>> for NetworkService {
296 type Response = usize;
297 type Error = NetError;
298 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
299
300 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
301 loop {
302 self.state = match &mut self.state {
303 NetworkServiceState::Disconnected => {
304 let connector = self.connector.clone();
305 NetworkServiceState::Connecting(Box::pin(async move {
306 connector.connect_backoff().await
307 }))
308 }
309 NetworkServiceState::Connecting(fut) => {
310 let socket = ready!(fut.poll_unpin(cx));
311 NetworkServiceState::Connected(socket)
312 }
313 NetworkServiceState::Connected(_) => break,
314 NetworkServiceState::Sending(fut) => {
315 match ready!(fut.poll_unpin(cx)) {
316 Ok(maybe_socket) => match maybe_socket {
319 Some(socket) => NetworkServiceState::Connected(socket),
320 None => NetworkServiceState::Disconnected,
321 },
322 Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)),
323 }
324 }
325 };
326 }
327 Poll::Ready(Ok(()))
328 }
329
330 fn call(&mut self, buf: Vec<u8>) -> Self::Future {
331 let (tx, rx) = oneshot::channel();
332
333 let mut socket = match std::mem::replace(&mut self.state, NetworkServiceState::Sending(rx))
334 {
335 NetworkServiceState::Connected(socket) => socket,
336 _ => panic!("poll_ready must be called first"),
337 };
338
339 Box::pin(async move {
340 match socket.send(&buf).await.context(net_error::FailedToSend) {
341 Ok(sent) => {
342 if sent != buf.len() {
344 socket.on_partial_send(buf.len(), sent);
345 }
346
347 let _ = tx.send(Some(socket));
350
351 Ok(sent)
352 }
353 Err(e) => {
354 let _ = tx.send(None);
357
358 Err(e)
359 }
360 }
361 })
362 }
363}