1use std::{
2 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
3 pin::Pin,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::{stream::BoxStream, FutureExt, StreamExt};
9use snafu::{ResultExt, Snafu};
10use tokio::{net::UdpSocket, time::sleep};
11use tokio_util::codec::Encoder;
12use vector_lib::configurable::configurable_component;
13use vector_lib::internal_event::{BytesSent, Protocol, Registered};
14
15use super::{
16 datagram::{send_datagrams, DatagramSocket},
17 SinkBuildError,
18};
19use crate::{
20 codecs::Transformer,
21 dns,
22 event::Event,
23 internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
24 net,
25 sinks::{
26 util::{retries::ExponentialBackoff, StreamSink},
27 Healthcheck, VectorSink,
28 },
29};
30
31#[derive(Debug, Snafu)]
32pub enum UdpError {
33 #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))]
34 BindError { source: std::io::Error },
35 #[snafu(display("Connect error: {}", source))]
36 ConnectError { source: std::io::Error },
37 #[snafu(display("No addresses returned."))]
38 NoAddresses,
39 #[snafu(display("Unable to resolve DNS: {}", source))]
40 DnsError { source: crate::dns::DnsError },
41}
42
43#[configurable_component]
45#[derive(Clone, Debug)]
46pub struct UdpSinkConfig {
47 #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
53 #[configurable(metadata(docs::examples = "https://somehost:5000"))]
54 address: String,
55
56 #[configurable(metadata(docs::type_unit = "bytes"))]
60 #[configurable(metadata(docs::examples = 65536))]
61 send_buffer_bytes: Option<usize>,
62}
63
64impl UdpSinkConfig {
65 pub const fn from_address(address: String) -> Self {
66 Self {
67 address,
68 send_buffer_bytes: None,
69 }
70 }
71
72 fn build_connector(&self) -> crate::Result<UdpConnector> {
73 let uri = self.address.parse::<http::Uri>()?;
74 let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
75 let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
76 Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
77 }
78
79 pub fn build(
80 &self,
81 transformer: Transformer,
82 encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
83 + Clone
84 + Send
85 + Sync
86 + 'static,
87 ) -> crate::Result<(VectorSink, Healthcheck)> {
88 let connector = self.build_connector()?;
89 let sink = UdpSink::new(connector.clone(), transformer, encoder);
90 Ok((
91 VectorSink::from_event_streamsink(sink),
92 async move { connector.healthcheck().await }.boxed(),
93 ))
94 }
95}
96
97#[derive(Clone)]
98struct UdpConnector {
99 host: String,
100 port: u16,
101 send_buffer_bytes: Option<usize>,
102}
103
104impl UdpConnector {
105 const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
106 Self {
107 host,
108 port,
109 send_buffer_bytes,
110 }
111 }
112
113 const fn fresh_backoff() -> ExponentialBackoff {
114 ExponentialBackoff::from_millis(2)
116 .factor(250)
117 .max_delay(Duration::from_secs(60))
118 }
119
120 async fn connect(&self) -> Result<UdpSocket, UdpError> {
121 let ip = dns::Resolver
122 .lookup_ip(self.host.clone())
123 .await
124 .context(DnsSnafu)?
125 .next()
126 .ok_or(UdpError::NoAddresses)?;
127
128 let addr = SocketAddr::new(ip, self.port);
129 let bind_address = find_bind_address(&addr);
130
131 let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?;
132
133 if let Some(send_buffer_bytes) = self.send_buffer_bytes {
134 if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes) {
135 warn!(message = "Failed configuring send buffer size on UDP socket.", %error);
136 }
137 }
138
139 socket.connect(addr).await.context(ConnectSnafu)?;
140
141 Ok(socket)
142 }
143
144 async fn connect_backoff(&self) -> UdpSocket {
145 let mut backoff = Self::fresh_backoff();
146 loop {
147 match self.connect().await {
148 Ok(socket) => {
149 emit!(UdpSocketConnectionEstablished {});
150 return socket;
151 }
152 Err(error) => {
153 emit!(UdpSocketOutgoingConnectionError { error });
154 sleep(backoff.next().unwrap()).await;
155 }
156 }
157 }
158 }
159
160 async fn healthcheck(&self) -> crate::Result<()> {
161 self.connect().await.map(|_| ()).map_err(Into::into)
162 }
163}
164
165struct UdpSink<E>
166where
167 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
168{
169 connector: UdpConnector,
170 transformer: Transformer,
171 encoder: E,
172 bytes_sent: Registered<BytesSent>,
173}
174
175impl<E> UdpSink<E>
176where
177 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
178{
179 fn new(connector: UdpConnector, transformer: Transformer, encoder: E) -> Self {
180 Self {
181 connector,
182 transformer,
183 encoder,
184 bytes_sent: register!(BytesSent::from(Protocol::UDP)),
185 }
186 }
187}
188
189#[async_trait]
190impl<E> StreamSink<Event> for UdpSink<E>
191where
192 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
193{
194 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
195 let mut input = input.peekable();
196
197 let mut encoder = self.encoder.clone();
198 while Pin::new(&mut input).peek().await.is_some() {
199 let socket = self.connector.connect_backoff().await;
200 send_datagrams(
201 &mut input,
202 DatagramSocket::Udp(socket),
203 &self.transformer,
204 &mut encoder,
205 &self.bytes_sent,
206 )
207 .await;
208 }
209
210 Ok(())
211 }
212}
213
214pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
215 match remote_addr {
216 SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
217 SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
218 }
219}