vector/sinks/util/service/net/
tcp.rs1use std::net::SocketAddr;
2
3use snafu::ResultExt;
4use tokio::net::TcpStream;
5use vector_lib::{
6 configurable::configurable_component,
7 tcp::TcpKeepaliveConfig,
8 tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig},
9};
10
11use super::{ConnectorType, HostAndPort, NetError, NetworkConnector, net_error::*};
12use crate::dns;
13
14#[configurable_component]
16#[derive(Clone, Debug)]
17pub struct TcpConnectorConfig {
18 #[configurable(derived)]
19 address: HostAndPort,
20
21 #[configurable(derived)]
22 keepalive: Option<TcpKeepaliveConfig>,
23
24 #[configurable(metadata(docs::type_unit = "bytes"))]
28 #[configurable(metadata(docs::examples = 65536))]
29 send_buffer_size: Option<usize>,
30
31 #[configurable(derived)]
32 tls: Option<TlsEnableableConfig>,
33}
34
35impl TcpConnectorConfig {
36 pub const fn from_address(host: String, port: u16) -> Self {
37 Self {
38 address: HostAndPort { host, port },
39 keepalive: None,
40 send_buffer_size: None,
41 tls: None,
42 }
43 }
44
45 pub fn as_connector(&self) -> NetworkConnector {
47 NetworkConnector {
48 inner: ConnectorType::Tcp(TcpConnector {
49 address: self.address.clone(),
50 keepalive: self.keepalive,
51 send_buffer_size: self.send_buffer_size,
52 tls: self.tls.clone(),
53 }),
54 }
55 }
56}
57
58#[derive(Clone)]
59pub(super) struct TcpConnector {
60 address: HostAndPort,
61 keepalive: Option<TcpKeepaliveConfig>,
62 send_buffer_size: Option<usize>,
63 tls: Option<TlsEnableableConfig>,
64}
65
66impl TcpConnector {
67 pub(super) async fn connect(
68 &self,
69 ) -> Result<(SocketAddr, MaybeTlsStream<TcpStream>), NetError> {
70 let ip = dns::Resolver
71 .lookup_ip(self.address.host.clone())
72 .await
73 .context(FailedToResolve)?
74 .next()
75 .ok_or(NetError::NoAddresses)?;
76
77 let addr = SocketAddr::new(ip, self.address.port);
78
79 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)
80 .context(FailedToConfigureTLS)?;
81 let mut stream = tls
82 .connect(self.address.host.as_str(), &addr)
83 .await
84 .context(FailedToConnectTLS)?;
85
86 if let Some(send_buffer_size) = self.send_buffer_size
87 && let Err(error) = stream.set_send_buffer_bytes(send_buffer_size)
88 {
89 warn!(%error, "Failed configuring send buffer size on TCP socket.");
90 }
91
92 if let Some(keepalive) = self.keepalive
93 && let Err(error) = stream.set_keepalive(keepalive)
94 {
95 warn!(%error, "Failed configuring keepalive on TCP socket.");
96 }
97
98 Ok((addr, stream))
99 }
100}