vector/sinks/util/service/net/
tcp.rs

1use std::net::SocketAddr;
2
3use snafu::ResultExt;
4use tokio::net::TcpStream;
5use vector_lib::{
6    configurable::configurable_component,
7    tcp::TcpKeepaliveConfig,
8    tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig},
9};
10
11use super::{ConnectorType, HostAndPort, NetError, NetworkConnector, net_error::*};
12use crate::dns;
13
14/// TCP configuration.
15#[configurable_component]
16#[derive(Clone, Debug)]
17pub struct TcpConnectorConfig {
18    #[configurable(derived)]
19    address: HostAndPort,
20
21    #[configurable(derived)]
22    keepalive: Option<TcpKeepaliveConfig>,
23
24    /// The size of the socket's send buffer.
25    ///
26    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
27    #[configurable(metadata(docs::type_unit = "bytes"))]
28    #[configurable(metadata(docs::examples = 65536))]
29    send_buffer_size: Option<usize>,
30
31    #[configurable(derived)]
32    tls: Option<TlsEnableableConfig>,
33}
34
35impl TcpConnectorConfig {
36    pub const fn from_address(host: String, port: u16) -> Self {
37        Self {
38            address: HostAndPort { host, port },
39            keepalive: None,
40            send_buffer_size: None,
41            tls: None,
42        }
43    }
44
45    /// Creates a [`NetworkConnector`] from this TCP connector configuration.
46    pub fn as_connector(&self) -> NetworkConnector {
47        NetworkConnector {
48            inner: ConnectorType::Tcp(TcpConnector {
49                address: self.address.clone(),
50                keepalive: self.keepalive,
51                send_buffer_size: self.send_buffer_size,
52                tls: self.tls.clone(),
53            }),
54        }
55    }
56}
57
58#[derive(Clone)]
59pub(super) struct TcpConnector {
60    address: HostAndPort,
61    keepalive: Option<TcpKeepaliveConfig>,
62    send_buffer_size: Option<usize>,
63    tls: Option<TlsEnableableConfig>,
64}
65
66impl TcpConnector {
67    pub(super) async fn connect(
68        &self,
69    ) -> Result<(SocketAddr, MaybeTlsStream<TcpStream>), NetError> {
70        let ip = dns::Resolver
71            .lookup_ip(self.address.host.clone())
72            .await
73            .context(FailedToResolve)?
74            .next()
75            .ok_or(NetError::NoAddresses)?;
76
77        let addr = SocketAddr::new(ip, self.address.port);
78
79        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)
80            .context(FailedToConfigureTLS)?;
81        let mut stream = tls
82            .connect(self.address.host.as_str(), &addr)
83            .await
84            .context(FailedToConnectTLS)?;
85
86        if let Some(send_buffer_size) = self.send_buffer_size
87            && let Err(error) = stream.set_send_buffer_bytes(send_buffer_size)
88        {
89            warn!(%error, "Failed configuring send buffer size on TCP socket.");
90        }
91
92        if let Some(keepalive) = self.keepalive
93            && let Err(error) = stream.set_keepalive(keepalive)
94        {
95            warn!(%error, "Failed configuring keepalive on TCP socket.");
96        }
97
98        Ok((addr, stream))
99    }
100}