vector/sinks/util/service/net/
tcp.rs1use std::net::SocketAddr;
2
3use snafu::ResultExt;
4use tokio::net::TcpStream;
5
6use vector_lib::configurable::configurable_component;
7use vector_lib::{
8 tcp::TcpKeepaliveConfig,
9 tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig},
10};
11
12use crate::dns;
13
14use super::{net_error::*, ConnectorType, HostAndPort, NetError, NetworkConnector};
15
16#[configurable_component]
18#[derive(Clone, Debug)]
19pub struct TcpConnectorConfig {
20 #[configurable(derived)]
21 address: HostAndPort,
22
23 #[configurable(derived)]
24 keepalive: Option<TcpKeepaliveConfig>,
25
26 #[configurable(metadata(docs::type_unit = "bytes"))]
30 #[configurable(metadata(docs::examples = 65536))]
31 send_buffer_size: Option<usize>,
32
33 #[configurable(derived)]
34 tls: Option<TlsEnableableConfig>,
35}
36
37impl TcpConnectorConfig {
38 pub const fn from_address(host: String, port: u16) -> Self {
39 Self {
40 address: HostAndPort { host, port },
41 keepalive: None,
42 send_buffer_size: None,
43 tls: None,
44 }
45 }
46
47 pub fn as_connector(&self) -> NetworkConnector {
49 NetworkConnector {
50 inner: ConnectorType::Tcp(TcpConnector {
51 address: self.address.clone(),
52 keepalive: self.keepalive,
53 send_buffer_size: self.send_buffer_size,
54 tls: self.tls.clone(),
55 }),
56 }
57 }
58}
59
60#[derive(Clone)]
61pub(super) struct TcpConnector {
62 address: HostAndPort,
63 keepalive: Option<TcpKeepaliveConfig>,
64 send_buffer_size: Option<usize>,
65 tls: Option<TlsEnableableConfig>,
66}
67
68impl TcpConnector {
69 pub(super) async fn connect(
70 &self,
71 ) -> Result<(SocketAddr, MaybeTlsStream<TcpStream>), NetError> {
72 let ip = dns::Resolver
73 .lookup_ip(self.address.host.clone())
74 .await
75 .context(FailedToResolve)?
76 .next()
77 .ok_or(NetError::NoAddresses)?;
78
79 let addr = SocketAddr::new(ip, self.address.port);
80
81 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)
82 .context(FailedToConfigureTLS)?;
83 let mut stream = tls
84 .connect(self.address.host.as_str(), &addr)
85 .await
86 .context(FailedToConnectTLS)?;
87
88 if let Some(send_buffer_size) = self.send_buffer_size {
89 if let Err(error) = stream.set_send_buffer_bytes(send_buffer_size) {
90 warn!(%error, "Failed configuring send buffer size on TCP socket.");
91 }
92 }
93
94 if let Some(keepalive) = self.keepalive {
95 if let Err(error) = stream.set_keepalive(keepalive) {
96 warn!(%error, "Failed configuring keepalive on TCP socket.");
97 }
98 }
99
100 Ok((addr, stream))
101 }
102}