vector/sinks/util/service/net/
mod.rs1mod tcp;
2mod udp;
3
4#[cfg(unix)]
5mod unix;
6
7use std::{
8 io,
9 net::SocketAddr,
10 task::{Context, Poll, ready},
11};
12
13use futures_util::{FutureExt, future::BoxFuture};
14use snafu::{ResultExt, Snafu};
15use tokio::{
16 io::AsyncWriteExt,
17 net::{TcpStream, UdpSocket},
18 sync::oneshot,
19 time::sleep,
20};
21use tower::Service;
22use vector_lib::{
23 configurable::configurable_component,
24 tls::{MaybeTlsStream, TlsError},
25};
26#[cfg(unix)]
27use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};
28
29#[cfg(unix)]
30use self::unix::UnixConnector;
31#[cfg(unix)]
32pub use self::unix::{UnixConnectorConfig, UnixMode};
33use self::{tcp::TcpConnector, udp::UdpConnector};
34pub use self::{tcp::TcpConnectorConfig, udp::UdpConnectorConfig};
35#[cfg(unix)]
36use crate::internal_events::{UnixSendIncompleteError, UnixSocketConnectionEstablished};
37use crate::{
38 common::backoff::ExponentialBackoff,
39 internal_events::{
40 SocketOutgoingConnectionError, TcpSocketConnectionEstablished, UdpSendIncompleteError,
41 },
42 sinks::Healthcheck,
43};
44
45#[configurable_component]
51#[derive(Clone, Debug)]
52#[serde(try_from = "String", into = "String")]
53#[configurable(title = "The address to connect to.")]
54#[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
55#[configurable(metadata(docs::examples = "somehost:5000"))]
56struct HostAndPort {
57 host: String,
59
60 port: u16,
62}
63
64impl TryFrom<String> for HostAndPort {
65 type Error = String;
66
67 fn try_from(value: String) -> Result<Self, Self::Error> {
68 let uri = value.parse::<http::Uri>().map_err(|e| e.to_string())?;
69 let host = uri
70 .host()
71 .ok_or_else(|| "missing host".to_string())?
72 .to_string();
73 let port = uri.port_u16().ok_or_else(|| "missing port".to_string())?;
74
75 Ok(Self { host, port })
76 }
77}
78
79impl From<HostAndPort> for String {
80 fn from(value: HostAndPort) -> Self {
81 format!("{}:{}", value.host, value.port)
82 }
83}
84
85#[derive(Debug, Snafu)]
86#[snafu(module, context(suffix(false)), visibility(pub))]
87pub enum NetError {
88 #[snafu(display("Address is invalid: {}", reason))]
89 InvalidAddress { reason: String },
90
91 #[snafu(display("Failed to resolve address: {}", source))]
92 FailedToResolve { source: crate::dns::DnsError },
93
94 #[snafu(display("No addresses returned."))]
95 NoAddresses,
96
97 #[snafu(display("Failed to configure socket: {}.", source))]
98 FailedToConfigure { source: std::io::Error },
99
100 #[snafu(display("Failed to configure TLS: {}.", source))]
101 FailedToConfigureTLS { source: TlsError },
102
103 #[snafu(display("Failed to bind socket: {}.", source))]
104 FailedToBind { source: std::io::Error },
105
106 #[snafu(display("Failed to send message: {}", source))]
107 FailedToSend { source: std::io::Error },
108
109 #[snafu(display("Failed to connect to endpoint: {}", source))]
110 FailedToConnect { source: std::io::Error },
111
112 #[snafu(display("Failed to connect to TLS endpoint: {}", source))]
113 FailedToConnectTLS { source: TlsError },
114
115 #[snafu(display("Failed to get socket back after send as channel closed unexpectedly."))]
116 ServiceSocketChannelClosed,
117}
118
119enum NetworkServiceState {
120 Disconnected,
122
123 Connecting(BoxFuture<'static, NetworkConnection>),
125
126 Connected(NetworkConnection),
128
129 Sending(oneshot::Receiver<Option<NetworkConnection>>),
135}
136
137enum NetworkConnection {
138 Tcp(MaybeTlsStream<TcpStream>),
139 Udp(UdpSocket),
140 #[cfg(unix)]
141 Unix(UnixEither),
142}
143
144impl NetworkConnection {
145 fn on_partial_send(&self, data_size: usize, sent: usize) {
146 match self {
147 Self::Tcp(_) => {}
150 Self::Udp(_) => {
151 emit!(UdpSendIncompleteError { data_size, sent });
152 }
153 #[cfg(unix)]
154 Self::Unix(_) => {
155 emit!(UnixSendIncompleteError { data_size, sent });
156 }
157 }
158 }
159
160 async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
161 match self {
162 Self::Tcp(stream) => stream.write_all(buf).await.map(|()| buf.len()),
163 Self::Udp(socket) => socket.send(buf).await,
164 #[cfg(unix)]
165 Self::Unix(socket) => socket.send(buf).await,
166 }
167 }
168}
169
170enum ConnectionMetadata {
171 Tcp {
172 peer_addr: SocketAddr,
173 },
174 #[cfg(unix)]
175 Unix {
176 path: PathBuf,
177 },
178}
179
180#[derive(Clone)]
181enum ConnectorType {
182 Tcp(TcpConnector),
183 Udp(UdpConnector),
184 #[cfg(unix)]
185 Unix(UnixConnector),
186}
187
188#[derive(Clone)]
192pub struct NetworkConnector {
193 inner: ConnectorType,
194}
195
196impl NetworkConnector {
197 fn on_connected(&self, metadata: ConnectionMetadata) {
198 match metadata {
199 ConnectionMetadata::Tcp { peer_addr } => {
200 emit!(TcpSocketConnectionEstablished {
201 peer_addr: Some(peer_addr)
202 });
203 }
204 #[cfg(unix)]
205 ConnectionMetadata::Unix { path } => {
206 emit!(UnixSocketConnectionEstablished { path: &path });
207 }
208 }
209 }
210
211 fn on_connection_error<E: std::error::Error>(&self, error: E) {
212 emit!(SocketOutgoingConnectionError { error });
213 }
214
215 async fn connect(&self) -> Result<(NetworkConnection, Option<ConnectionMetadata>), NetError> {
216 match &self.inner {
217 ConnectorType::Tcp(connector) => {
218 let (peer_addr, stream) = connector.connect().await?;
219
220 Ok((
221 NetworkConnection::Tcp(stream),
222 Some(ConnectionMetadata::Tcp { peer_addr }),
223 ))
224 }
225 ConnectorType::Udp(connector) => {
226 let socket = connector.connect().await?;
227
228 Ok((NetworkConnection::Udp(socket), None))
229 }
230 #[cfg(unix)]
231 ConnectorType::Unix(connector) => {
232 let (path, socket) = connector.connect().await?;
233
234 Ok((
235 NetworkConnection::Unix(socket),
236 Some(ConnectionMetadata::Unix { path }),
237 ))
238 }
239 }
240 }
241
242 async fn connect_backoff(&self) -> NetworkConnection {
243 let mut backoff = ExponentialBackoff::default();
245
246 loop {
247 match self.connect().await {
248 Ok((connection, maybe_metadata)) => {
249 if let Some(metadata) = maybe_metadata {
250 self.on_connected(metadata);
251 }
252
253 return connection;
254 }
255 Err(error) => {
256 self.on_connection_error(error);
257 sleep(backoff.next().unwrap()).await;
258 }
259 }
260 }
261 }
262
263 pub fn healthcheck(&self) -> Healthcheck {
265 let connector = self.clone();
266 Box::pin(async move { connector.connect().await.map(|_| ()).map_err(Into::into) })
267 }
268
269 pub fn service(&self) -> NetworkService {
271 NetworkService::new(self.clone())
272 }
273}
274
275pub struct NetworkService {
279 connector: NetworkConnector,
280 state: NetworkServiceState,
281}
282
283impl NetworkService {
284 const fn new(connector: NetworkConnector) -> Self {
285 Self {
286 connector,
287 state: NetworkServiceState::Disconnected,
288 }
289 }
290}
291
292impl Service<Vec<u8>> for NetworkService {
293 type Response = usize;
294 type Error = NetError;
295 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
296
297 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
298 loop {
299 self.state = match &mut self.state {
300 NetworkServiceState::Disconnected => {
301 let connector = self.connector.clone();
302 NetworkServiceState::Connecting(Box::pin(async move {
303 connector.connect_backoff().await
304 }))
305 }
306 NetworkServiceState::Connecting(fut) => {
307 let socket = ready!(fut.poll_unpin(cx));
308 NetworkServiceState::Connected(socket)
309 }
310 NetworkServiceState::Connected(_) => break,
311 NetworkServiceState::Sending(fut) => {
312 match ready!(fut.poll_unpin(cx)) {
313 Ok(maybe_socket) => match maybe_socket {
316 Some(socket) => NetworkServiceState::Connected(socket),
317 None => NetworkServiceState::Disconnected,
318 },
319 Err(_) => return Poll::Ready(Err(NetError::ServiceSocketChannelClosed)),
320 }
321 }
322 };
323 }
324 Poll::Ready(Ok(()))
325 }
326
327 fn call(&mut self, buf: Vec<u8>) -> Self::Future {
328 let (tx, rx) = oneshot::channel();
329
330 let mut socket = match std::mem::replace(&mut self.state, NetworkServiceState::Sending(rx))
331 {
332 NetworkServiceState::Connected(socket) => socket,
333 _ => panic!("poll_ready must be called first"),
334 };
335
336 Box::pin(async move {
337 match socket.send(&buf).await.context(net_error::FailedToSend) {
338 Ok(sent) => {
339 if sent != buf.len() {
341 socket.on_partial_send(buf.len(), sent);
342 }
343
344 let _ = tx.send(Some(socket));
347
348 Ok(sent)
349 }
350 Err(e) => {
351 let _ = tx.send(None);
354
355 Err(e)
356 }
357 }
358 })
359 }
360}