vector/sinks/util/service/net/
tcp.rs

1use std::net::SocketAddr;
2
3use snafu::ResultExt;
4use tokio::net::TcpStream;
5
6use vector_lib::configurable::configurable_component;
7use vector_lib::{
8    tcp::TcpKeepaliveConfig,
9    tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig},
10};
11
12use crate::dns;
13
14use super::{net_error::*, ConnectorType, HostAndPort, NetError, NetworkConnector};
15
16/// TCP configuration.
17#[configurable_component]
18#[derive(Clone, Debug)]
19pub struct TcpConnectorConfig {
20    #[configurable(derived)]
21    address: HostAndPort,
22
23    #[configurable(derived)]
24    keepalive: Option<TcpKeepaliveConfig>,
25
26    /// The size of the socket's send buffer.
27    ///
28    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
29    #[configurable(metadata(docs::type_unit = "bytes"))]
30    #[configurable(metadata(docs::examples = 65536))]
31    send_buffer_size: Option<usize>,
32
33    #[configurable(derived)]
34    tls: Option<TlsEnableableConfig>,
35}
36
37impl TcpConnectorConfig {
38    pub const fn from_address(host: String, port: u16) -> Self {
39        Self {
40            address: HostAndPort { host, port },
41            keepalive: None,
42            send_buffer_size: None,
43            tls: None,
44        }
45    }
46
47    /// Creates a [`NetworkConnector`] from this TCP connector configuration.
48    pub fn as_connector(&self) -> NetworkConnector {
49        NetworkConnector {
50            inner: ConnectorType::Tcp(TcpConnector {
51                address: self.address.clone(),
52                keepalive: self.keepalive,
53                send_buffer_size: self.send_buffer_size,
54                tls: self.tls.clone(),
55            }),
56        }
57    }
58}
59
60#[derive(Clone)]
61pub(super) struct TcpConnector {
62    address: HostAndPort,
63    keepalive: Option<TcpKeepaliveConfig>,
64    send_buffer_size: Option<usize>,
65    tls: Option<TlsEnableableConfig>,
66}
67
68impl TcpConnector {
69    pub(super) async fn connect(
70        &self,
71    ) -> Result<(SocketAddr, MaybeTlsStream<TcpStream>), NetError> {
72        let ip = dns::Resolver
73            .lookup_ip(self.address.host.clone())
74            .await
75            .context(FailedToResolve)?
76            .next()
77            .ok_or(NetError::NoAddresses)?;
78
79        let addr = SocketAddr::new(ip, self.address.port);
80
81        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)
82            .context(FailedToConfigureTLS)?;
83        let mut stream = tls
84            .connect(self.address.host.as_str(), &addr)
85            .await
86            .context(FailedToConnectTLS)?;
87
88        if let Some(send_buffer_size) = self.send_buffer_size {
89            if let Err(error) = stream.set_send_buffer_bytes(send_buffer_size) {
90                warn!(%error, "Failed configuring send buffer size on TCP socket.");
91            }
92        }
93
94        if let Some(keepalive) = self.keepalive {
95            if let Err(error) = stream.set_keepalive(keepalive) {
96                warn!(%error, "Failed configuring keepalive on TCP socket.");
97            }
98        }
99
100        Ok((addr, stream))
101    }
102}