1use std::{
2 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
3 pin::Pin,
4};
5
6use async_trait::async_trait;
7use futures::{FutureExt, StreamExt, stream::BoxStream};
8use snafu::{ResultExt, Snafu};
9use tokio::{net::UdpSocket, time::sleep};
10use tokio_util::codec::Encoder;
11use vector_lib::{
12 codecs::encoding::Chunker,
13 configurable::configurable_component,
14 internal_event::{BytesSent, Protocol, Registered},
15};
16
17use super::{
18 SinkBuildError,
19 datagram::{DatagramSocket, send_datagrams},
20};
21use crate::{
22 codecs::Transformer,
23 common::backoff::ExponentialBackoff,
24 dns,
25 event::Event,
26 internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
27 net,
28 sinks::{Healthcheck, VectorSink, util::StreamSink},
29};
30
31#[derive(Debug, Snafu)]
32pub enum UdpError {
33 #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))]
34 BindError { source: std::io::Error },
35 #[snafu(display("Connect error: {}", source))]
36 ConnectError { source: std::io::Error },
37 #[snafu(display("No addresses returned."))]
38 NoAddresses,
39 #[snafu(display("Unable to resolve DNS: {}", source))]
40 DnsError { source: crate::dns::DnsError },
41}
42
43#[configurable_component]
45#[derive(Clone, Debug)]
46pub struct UdpSinkConfig {
47 #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
53 #[configurable(metadata(docs::examples = "https://somehost:5000"))]
54 address: String,
55
56 #[configurable(metadata(docs::type_unit = "bytes"))]
60 #[configurable(metadata(docs::examples = 65536))]
61 send_buffer_bytes: Option<usize>,
62}
63
64impl UdpSinkConfig {
65 pub const fn from_address(address: String) -> Self {
66 Self {
67 address,
68 send_buffer_bytes: None,
69 }
70 }
71
72 fn build_connector(&self) -> crate::Result<UdpConnector> {
73 let uri = self.address.parse::<http::Uri>()?;
74 let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
75 let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
76 Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
77 }
78
79 pub fn build(
80 &self,
81 transformer: Transformer,
82 encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
83 + Clone
84 + Send
85 + Sync
86 + 'static,
87 chunker: Option<Chunker>,
88 ) -> crate::Result<(VectorSink, Healthcheck)> {
89 let connector = self.build_connector()?;
90 let sink = UdpSink::new(connector.clone(), transformer, encoder, chunker);
91 Ok((
92 VectorSink::from_event_streamsink(sink),
93 async move { connector.healthcheck().await }.boxed(),
94 ))
95 }
96}
97
98#[derive(Clone)]
99struct UdpConnector {
100 host: String,
101 port: u16,
102 send_buffer_bytes: Option<usize>,
103}
104
105impl UdpConnector {
106 const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
107 Self {
108 host,
109 port,
110 send_buffer_bytes,
111 }
112 }
113
114 fn fresh_backoff() -> ExponentialBackoff {
115 ExponentialBackoff::default()
117 }
118
119 async fn connect(&self) -> Result<UdpSocket, UdpError> {
120 let ip = dns::Resolver
121 .lookup_ip(self.host.clone())
122 .await
123 .context(DnsSnafu)?
124 .next()
125 .ok_or(UdpError::NoAddresses)?;
126
127 let addr = SocketAddr::new(ip, self.port);
128 let bind_address = find_bind_address(&addr);
129
130 let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?;
131
132 if let Some(send_buffer_bytes) = self.send_buffer_bytes
133 && let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes)
134 {
135 warn!(message = "Failed configuring send buffer size on UDP socket.", %error);
136 }
137
138 socket.connect(addr).await.context(ConnectSnafu)?;
139
140 Ok(socket)
141 }
142
143 async fn connect_backoff(&self) -> UdpSocket {
144 let mut backoff = Self::fresh_backoff();
145 loop {
146 match self.connect().await {
147 Ok(socket) => {
148 emit!(UdpSocketConnectionEstablished {});
149 return socket;
150 }
151 Err(error) => {
152 emit!(UdpSocketOutgoingConnectionError { error });
153 sleep(backoff.next().unwrap()).await;
154 }
155 }
156 }
157 }
158
159 async fn healthcheck(&self) -> crate::Result<()> {
160 self.connect().await.map(|_| ()).map_err(Into::into)
161 }
162}
163
164struct UdpSink<E>
165where
166 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
167{
168 connector: UdpConnector,
169 transformer: Transformer,
170 encoder: E,
171 chunker: Option<Chunker>,
172 bytes_sent: Registered<BytesSent>,
173}
174
175impl<E> UdpSink<E>
176where
177 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
178{
179 fn new(
180 connector: UdpConnector,
181 transformer: Transformer,
182 encoder: E,
183 chunker: Option<Chunker>,
184 ) -> Self {
185 Self {
186 connector,
187 transformer,
188 encoder,
189 chunker,
190 bytes_sent: register!(BytesSent::from(Protocol::UDP)),
191 }
192 }
193}
194
195#[async_trait]
196impl<E> StreamSink<Event> for UdpSink<E>
197where
198 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
199{
200 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
201 let mut input = input.peekable();
202
203 let mut encoder = self.encoder.clone();
204 let chunker = self.chunker.clone();
205 while Pin::new(&mut input).peek().await.is_some() {
206 let socket = self.connector.connect_backoff().await;
207 send_datagrams(
208 &mut input,
209 DatagramSocket::Udp(socket),
210 &self.transformer,
211 &mut encoder,
212 &chunker,
213 &self.bytes_sent,
214 )
215 .await;
216 }
217
218 Ok(())
219 }
220}
221
222pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
223 match remote_addr {
224 SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
225 SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
226 }
227}