1use std::{
2 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
3 pin::Pin,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::{stream::BoxStream, FutureExt, StreamExt};
9use snafu::{ResultExt, Snafu};
10use tokio::{net::UdpSocket, time::sleep};
11use tokio_util::codec::Encoder;
12use vector_lib::configurable::configurable_component;
13use vector_lib::internal_event::{BytesSent, Protocol, Registered};
14
15use super::{
16 datagram::{send_datagrams, DatagramSocket},
17 SinkBuildError,
18};
19use crate::{
20 codecs::Transformer,
21 common::backoff::ExponentialBackoff,
22 dns,
23 event::Event,
24 internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
25 net,
26 sinks::{util::StreamSink, Healthcheck, VectorSink},
27};
28
29#[derive(Debug, Snafu)]
30pub enum UdpError {
31 #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))]
32 BindError { source: std::io::Error },
33 #[snafu(display("Connect error: {}", source))]
34 ConnectError { source: std::io::Error },
35 #[snafu(display("No addresses returned."))]
36 NoAddresses,
37 #[snafu(display("Unable to resolve DNS: {}", source))]
38 DnsError { source: crate::dns::DnsError },
39}
40
41#[configurable_component]
43#[derive(Clone, Debug)]
44pub struct UdpSinkConfig {
45 #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
51 #[configurable(metadata(docs::examples = "https://somehost:5000"))]
52 address: String,
53
54 #[configurable(metadata(docs::type_unit = "bytes"))]
58 #[configurable(metadata(docs::examples = 65536))]
59 send_buffer_bytes: Option<usize>,
60}
61
62impl UdpSinkConfig {
63 pub const fn from_address(address: String) -> Self {
64 Self {
65 address,
66 send_buffer_bytes: None,
67 }
68 }
69
70 fn build_connector(&self) -> crate::Result<UdpConnector> {
71 let uri = self.address.parse::<http::Uri>()?;
72 let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
73 let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
74 Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
75 }
76
77 pub fn build(
78 &self,
79 transformer: Transformer,
80 encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
81 + Clone
82 + Send
83 + Sync
84 + 'static,
85 ) -> crate::Result<(VectorSink, Healthcheck)> {
86 let connector = self.build_connector()?;
87 let sink = UdpSink::new(connector.clone(), transformer, encoder);
88 Ok((
89 VectorSink::from_event_streamsink(sink),
90 async move { connector.healthcheck().await }.boxed(),
91 ))
92 }
93}
94
95#[derive(Clone)]
96struct UdpConnector {
97 host: String,
98 port: u16,
99 send_buffer_bytes: Option<usize>,
100}
101
102impl UdpConnector {
103 const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
104 Self {
105 host,
106 port,
107 send_buffer_bytes,
108 }
109 }
110
111 const fn fresh_backoff() -> ExponentialBackoff {
112 ExponentialBackoff::from_millis(2)
114 .factor(250)
115 .max_delay(Duration::from_secs(60))
116 }
117
118 async fn connect(&self) -> Result<UdpSocket, UdpError> {
119 let ip = dns::Resolver
120 .lookup_ip(self.host.clone())
121 .await
122 .context(DnsSnafu)?
123 .next()
124 .ok_or(UdpError::NoAddresses)?;
125
126 let addr = SocketAddr::new(ip, self.port);
127 let bind_address = find_bind_address(&addr);
128
129 let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?;
130
131 if let Some(send_buffer_bytes) = self.send_buffer_bytes {
132 if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes) {
133 warn!(message = "Failed configuring send buffer size on UDP socket.", %error);
134 }
135 }
136
137 socket.connect(addr).await.context(ConnectSnafu)?;
138
139 Ok(socket)
140 }
141
142 async fn connect_backoff(&self) -> UdpSocket {
143 let mut backoff = Self::fresh_backoff();
144 loop {
145 match self.connect().await {
146 Ok(socket) => {
147 emit!(UdpSocketConnectionEstablished {});
148 return socket;
149 }
150 Err(error) => {
151 emit!(UdpSocketOutgoingConnectionError { error });
152 sleep(backoff.next().unwrap()).await;
153 }
154 }
155 }
156 }
157
158 async fn healthcheck(&self) -> crate::Result<()> {
159 self.connect().await.map(|_| ()).map_err(Into::into)
160 }
161}
162
163struct UdpSink<E>
164where
165 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
166{
167 connector: UdpConnector,
168 transformer: Transformer,
169 encoder: E,
170 bytes_sent: Registered<BytesSent>,
171}
172
173impl<E> UdpSink<E>
174where
175 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
176{
177 fn new(connector: UdpConnector, transformer: Transformer, encoder: E) -> Self {
178 Self {
179 connector,
180 transformer,
181 encoder,
182 bytes_sent: register!(BytesSent::from(Protocol::UDP)),
183 }
184 }
185}
186
187#[async_trait]
188impl<E> StreamSink<Event> for UdpSink<E>
189where
190 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
191{
192 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
193 let mut input = input.peekable();
194
195 let mut encoder = self.encoder.clone();
196 while Pin::new(&mut input).peek().await.is_some() {
197 let socket = self.connector.connect_backoff().await;
198 send_datagrams(
199 &mut input,
200 DatagramSocket::Udp(socket),
201 &self.transformer,
202 &mut encoder,
203 &self.bytes_sent,
204 )
205 .await;
206 }
207
208 Ok(())
209 }
210}
211
212pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
213 match remote_addr {
214 SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
215 SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
216 }
217}