vector/sinks/util/service/net/
udp.rs

1use std::net::SocketAddr;
2
3use snafu::ResultExt;
4use tokio::net::UdpSocket;
5use vector_lib::configurable::configurable_component;
6
7use super::{ConnectorType, HostAndPort, NetError, NetworkConnector, net_error::*};
8use crate::{dns, net};
9
10/// UDP configuration.
11#[configurable_component]
12#[derive(Clone, Debug)]
13pub struct UdpConnectorConfig {
14    #[configurable(derived)]
15    address: HostAndPort,
16
17    /// The size of the socket's send buffer.
18    ///
19    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
20    #[configurable(metadata(docs::type_unit = "bytes"))]
21    #[configurable(metadata(docs::examples = 65536))]
22    send_buffer_size: Option<usize>,
23}
24
25impl UdpConnectorConfig {
26    pub const fn from_address(host: String, port: u16) -> Self {
27        Self {
28            address: HostAndPort { host, port },
29            send_buffer_size: None,
30        }
31    }
32
33    /// Creates a [`NetworkConnector`] from this UDP connector configuration.
34    pub fn as_connector(&self) -> NetworkConnector {
35        NetworkConnector {
36            inner: ConnectorType::Udp(UdpConnector {
37                address: self.address.clone(),
38                send_buffer_size: self.send_buffer_size,
39            }),
40        }
41    }
42}
43
44#[derive(Clone)]
45pub(super) struct UdpConnector {
46    address: HostAndPort,
47    send_buffer_size: Option<usize>,
48}
49
50impl UdpConnector {
51    pub(super) async fn connect(&self) -> Result<UdpSocket, NetError> {
52        let ip = dns::Resolver
53            .lookup_ip(self.address.host.clone())
54            .await
55            .context(FailedToResolve)?
56            .next()
57            .ok_or(NetError::NoAddresses)?;
58
59        let addr = SocketAddr::new(ip, self.address.port);
60        let bind_address = crate::sinks::util::udp::find_bind_address(&addr);
61
62        let socket = UdpSocket::bind(bind_address).await.context(FailedToBind)?;
63
64        if let Some(send_buffer_size) = self.send_buffer_size
65            && let Err(error) = net::set_send_buffer_size(&socket, send_buffer_size)
66        {
67            warn!(%error, "Failed configuring send buffer size on UDP socket.");
68        }
69
70        socket.connect(addr).await.context(FailedToConnect)?;
71
72        Ok(socket)
73    }
74}