vector/sinks/util/service/net/
udp.rs1use std::net::SocketAddr;
2
3use snafu::ResultExt;
4use tokio::net::UdpSocket;
5
6use vector_lib::configurable::configurable_component;
7
8use crate::{dns, net};
9
10use super::{net_error::*, ConnectorType, HostAndPort, NetError, NetworkConnector};
11
12#[configurable_component]
14#[derive(Clone, Debug)]
15pub struct UdpConnectorConfig {
16 #[configurable(derived)]
17 address: HostAndPort,
18
19 #[configurable(metadata(docs::type_unit = "bytes"))]
23 #[configurable(metadata(docs::examples = 65536))]
24 send_buffer_size: Option<usize>,
25}
26
27impl UdpConnectorConfig {
28 pub const fn from_address(host: String, port: u16) -> Self {
29 Self {
30 address: HostAndPort { host, port },
31 send_buffer_size: None,
32 }
33 }
34
35 pub fn as_connector(&self) -> NetworkConnector {
37 NetworkConnector {
38 inner: ConnectorType::Udp(UdpConnector {
39 address: self.address.clone(),
40 send_buffer_size: self.send_buffer_size,
41 }),
42 }
43 }
44}
45
46#[derive(Clone)]
47pub(super) struct UdpConnector {
48 address: HostAndPort,
49 send_buffer_size: Option<usize>,
50}
51
52impl UdpConnector {
53 pub(super) async fn connect(&self) -> Result<UdpSocket, NetError> {
54 let ip = dns::Resolver
55 .lookup_ip(self.address.host.clone())
56 .await
57 .context(FailedToResolve)?
58 .next()
59 .ok_or(NetError::NoAddresses)?;
60
61 let addr = SocketAddr::new(ip, self.address.port);
62 let bind_address = crate::sinks::util::udp::find_bind_address(&addr);
63
64 let socket = UdpSocket::bind(bind_address).await.context(FailedToBind)?;
65
66 if let Some(send_buffer_size) = self.send_buffer_size {
67 if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_size) {
68 warn!(%error, "Failed configuring send buffer size on UDP socket.");
69 }
70 }
71
72 socket.connect(addr).await.context(FailedToConnect)?;
73
74 Ok(socket)
75 }
76}