vector/sinks/util/service/net/
udp.rs

1use std::net::SocketAddr;
2
3use snafu::ResultExt;
4use tokio::net::UdpSocket;
5
6use vector_lib::configurable::configurable_component;
7
8use crate::{dns, net};
9
10use super::{net_error::*, ConnectorType, HostAndPort, NetError, NetworkConnector};
11
12/// UDP configuration.
13#[configurable_component]
14#[derive(Clone, Debug)]
15pub struct UdpConnectorConfig {
16    #[configurable(derived)]
17    address: HostAndPort,
18
19    /// The size of the socket's send buffer.
20    ///
21    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
22    #[configurable(metadata(docs::type_unit = "bytes"))]
23    #[configurable(metadata(docs::examples = 65536))]
24    send_buffer_size: Option<usize>,
25}
26
27impl UdpConnectorConfig {
28    pub const fn from_address(host: String, port: u16) -> Self {
29        Self {
30            address: HostAndPort { host, port },
31            send_buffer_size: None,
32        }
33    }
34
35    /// Creates a [`NetworkConnector`] from this UDP connector configuration.
36    pub fn as_connector(&self) -> NetworkConnector {
37        NetworkConnector {
38            inner: ConnectorType::Udp(UdpConnector {
39                address: self.address.clone(),
40                send_buffer_size: self.send_buffer_size,
41            }),
42        }
43    }
44}
45
46#[derive(Clone)]
47pub(super) struct UdpConnector {
48    address: HostAndPort,
49    send_buffer_size: Option<usize>,
50}
51
52impl UdpConnector {
53    pub(super) async fn connect(&self) -> Result<UdpSocket, NetError> {
54        let ip = dns::Resolver
55            .lookup_ip(self.address.host.clone())
56            .await
57            .context(FailedToResolve)?
58            .next()
59            .ok_or(NetError::NoAddresses)?;
60
61        let addr = SocketAddr::new(ip, self.address.port);
62        let bind_address = crate::sinks::util::udp::find_bind_address(&addr);
63
64        let socket = UdpSocket::bind(bind_address).await.context(FailedToBind)?;
65
66        if let Some(send_buffer_size) = self.send_buffer_size {
67            if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_size) {
68                warn!(%error, "Failed configuring send buffer size on UDP socket.");
69            }
70        }
71
72        socket.connect(addr).await.context(FailedToConnect)?;
73
74        Ok(socket)
75    }
76}