vector/sinks/util/service/net/
udp.rs1use std::net::SocketAddr;
2
3use snafu::ResultExt;
4use tokio::net::UdpSocket;
5use vector_lib::configurable::configurable_component;
6
7use super::{ConnectorType, HostAndPort, NetError, NetworkConnector, net_error::*};
8use crate::{dns, net};
9
10#[configurable_component]
12#[derive(Clone, Debug)]
13pub struct UdpConnectorConfig {
14 #[configurable(derived)]
15 address: HostAndPort,
16
17 #[configurable(metadata(docs::type_unit = "bytes"))]
21 #[configurable(metadata(docs::examples = 65536))]
22 send_buffer_size: Option<usize>,
23}
24
25impl UdpConnectorConfig {
26 pub const fn from_address(host: String, port: u16) -> Self {
27 Self {
28 address: HostAndPort { host, port },
29 send_buffer_size: None,
30 }
31 }
32
33 pub fn as_connector(&self) -> NetworkConnector {
35 NetworkConnector {
36 inner: ConnectorType::Udp(UdpConnector {
37 address: self.address.clone(),
38 send_buffer_size: self.send_buffer_size,
39 }),
40 }
41 }
42}
43
44#[derive(Clone)]
45pub(super) struct UdpConnector {
46 address: HostAndPort,
47 send_buffer_size: Option<usize>,
48}
49
50impl UdpConnector {
51 pub(super) async fn connect(&self) -> Result<UdpSocket, NetError> {
52 let ip = dns::Resolver
53 .lookup_ip(self.address.host.clone())
54 .await
55 .context(FailedToResolve)?
56 .next()
57 .ok_or(NetError::NoAddresses)?;
58
59 let addr = SocketAddr::new(ip, self.address.port);
60 let bind_address = crate::sinks::util::udp::find_bind_address(&addr);
61
62 let socket = UdpSocket::bind(bind_address).await.context(FailedToBind)?;
63
64 if let Some(send_buffer_size) = self.send_buffer_size
65 && let Err(error) = net::set_send_buffer_size(&socket, send_buffer_size)
66 {
67 warn!(%error, "Failed configuring send buffer size on UDP socket.");
68 }
69
70 socket.connect(addr).await.context(FailedToConnect)?;
71
72 Ok(socket)
73 }
74}