1use std::{
2 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
3 pin::Pin,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::{FutureExt, StreamExt, stream::BoxStream};
9use snafu::{ResultExt, Snafu};
10use tokio::{net::UdpSocket, time::sleep};
11use tokio_util::codec::Encoder;
12use vector_lib::{
13 codecs::encoding::Chunker,
14 configurable::configurable_component,
15 internal_event::{BytesSent, Protocol, Registered},
16};
17
18use super::{
19 SinkBuildError,
20 datagram::{DatagramSocket, send_datagrams},
21};
22use crate::{
23 codecs::Transformer,
24 common::backoff::ExponentialBackoff,
25 dns,
26 event::Event,
27 internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
28 net,
29 sinks::{Healthcheck, VectorSink, util::StreamSink},
30};
31
32#[derive(Debug, Snafu)]
33pub enum UdpError {
34 #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))]
35 BindError { source: std::io::Error },
36 #[snafu(display("Connect error: {}", source))]
37 ConnectError { source: std::io::Error },
38 #[snafu(display("No addresses returned."))]
39 NoAddresses,
40 #[snafu(display("Unable to resolve DNS: {}", source))]
41 DnsError { source: crate::dns::DnsError },
42}
43
44#[configurable_component]
46#[derive(Clone, Debug)]
47pub struct UdpSinkConfig {
48 #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
54 #[configurable(metadata(docs::examples = "https://somehost:5000"))]
55 address: String,
56
57 #[configurable(metadata(docs::type_unit = "bytes"))]
61 #[configurable(metadata(docs::examples = 65536))]
62 send_buffer_bytes: Option<usize>,
63}
64
65impl UdpSinkConfig {
66 pub const fn from_address(address: String) -> Self {
67 Self {
68 address,
69 send_buffer_bytes: None,
70 }
71 }
72
73 fn build_connector(&self) -> crate::Result<UdpConnector> {
74 let uri = self.address.parse::<http::Uri>()?;
75 let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
76 let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
77 Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
78 }
79
80 pub fn build(
81 &self,
82 transformer: Transformer,
83 encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
84 + Clone
85 + Send
86 + Sync
87 + 'static,
88 chunker: Option<Chunker>,
89 ) -> crate::Result<(VectorSink, Healthcheck)> {
90 let connector = self.build_connector()?;
91 let sink = UdpSink::new(connector.clone(), transformer, encoder, chunker);
92 Ok((
93 VectorSink::from_event_streamsink(sink),
94 async move { connector.healthcheck().await }.boxed(),
95 ))
96 }
97}
98
99#[derive(Clone)]
100struct UdpConnector {
101 host: String,
102 port: u16,
103 send_buffer_bytes: Option<usize>,
104}
105
106impl UdpConnector {
107 const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
108 Self {
109 host,
110 port,
111 send_buffer_bytes,
112 }
113 }
114
115 const fn fresh_backoff() -> ExponentialBackoff {
116 ExponentialBackoff::from_millis(2)
118 .factor(250)
119 .max_delay(Duration::from_secs(60))
120 }
121
122 async fn connect(&self) -> Result<UdpSocket, UdpError> {
123 let ip = dns::Resolver
124 .lookup_ip(self.host.clone())
125 .await
126 .context(DnsSnafu)?
127 .next()
128 .ok_or(UdpError::NoAddresses)?;
129
130 let addr = SocketAddr::new(ip, self.port);
131 let bind_address = find_bind_address(&addr);
132
133 let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?;
134
135 if let Some(send_buffer_bytes) = self.send_buffer_bytes
136 && let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes)
137 {
138 warn!(message = "Failed configuring send buffer size on UDP socket.", %error);
139 }
140
141 socket.connect(addr).await.context(ConnectSnafu)?;
142
143 Ok(socket)
144 }
145
146 async fn connect_backoff(&self) -> UdpSocket {
147 let mut backoff = Self::fresh_backoff();
148 loop {
149 match self.connect().await {
150 Ok(socket) => {
151 emit!(UdpSocketConnectionEstablished {});
152 return socket;
153 }
154 Err(error) => {
155 emit!(UdpSocketOutgoingConnectionError { error });
156 sleep(backoff.next().unwrap()).await;
157 }
158 }
159 }
160 }
161
162 async fn healthcheck(&self) -> crate::Result<()> {
163 self.connect().await.map(|_| ()).map_err(Into::into)
164 }
165}
166
167struct UdpSink<E>
168where
169 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
170{
171 connector: UdpConnector,
172 transformer: Transformer,
173 encoder: E,
174 chunker: Option<Chunker>,
175 bytes_sent: Registered<BytesSent>,
176}
177
178impl<E> UdpSink<E>
179where
180 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
181{
182 fn new(
183 connector: UdpConnector,
184 transformer: Transformer,
185 encoder: E,
186 chunker: Option<Chunker>,
187 ) -> Self {
188 Self {
189 connector,
190 transformer,
191 encoder,
192 chunker,
193 bytes_sent: register!(BytesSent::from(Protocol::UDP)),
194 }
195 }
196}
197
198#[async_trait]
199impl<E> StreamSink<Event> for UdpSink<E>
200where
201 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
202{
203 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
204 let mut input = input.peekable();
205
206 let mut encoder = self.encoder.clone();
207 let chunker = self.chunker.clone();
208 while Pin::new(&mut input).peek().await.is_some() {
209 let socket = self.connector.connect_backoff().await;
210 send_datagrams(
211 &mut input,
212 DatagramSocket::Udp(socket),
213 &self.transformer,
214 &mut encoder,
215 &chunker,
216 &self.bytes_sent,
217 )
218 .await;
219 }
220
221 Ok(())
222 }
223}
224
225pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
226 match remote_addr {
227 SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
228 SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
229 }
230}