vector/sinks/util/
udp.rs

1use std::{
2    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::{stream::BoxStream, FutureExt, StreamExt};
9use snafu::{ResultExt, Snafu};
10use tokio::{net::UdpSocket, time::sleep};
11use tokio_util::codec::Encoder;
12use vector_lib::configurable::configurable_component;
13use vector_lib::internal_event::{BytesSent, Protocol, Registered};
14
15use super::{
16    datagram::{send_datagrams, DatagramSocket},
17    SinkBuildError,
18};
19use crate::{
20    codecs::Transformer,
21    dns,
22    event::Event,
23    internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
24    net,
25    sinks::{
26        util::{retries::ExponentialBackoff, StreamSink},
27        Healthcheck, VectorSink,
28    },
29};
30
31#[derive(Debug, Snafu)]
32pub enum UdpError {
33    #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))]
34    BindError { source: std::io::Error },
35    #[snafu(display("Connect error: {}", source))]
36    ConnectError { source: std::io::Error },
37    #[snafu(display("No addresses returned."))]
38    NoAddresses,
39    #[snafu(display("Unable to resolve DNS: {}", source))]
40    DnsError { source: crate::dns::DnsError },
41}
42
43/// A UDP sink.
44#[configurable_component]
45#[derive(Clone, Debug)]
46pub struct UdpSinkConfig {
47    /// The address to connect to.
48    ///
49    /// Both IP address and hostname are accepted formats.
50    ///
51    /// The address _must_ include a port.
52    #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
53    #[configurable(metadata(docs::examples = "https://somehost:5000"))]
54    address: String,
55
56    /// The size of the socket's send buffer.
57    ///
58    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
59    #[configurable(metadata(docs::type_unit = "bytes"))]
60    #[configurable(metadata(docs::examples = 65536))]
61    send_buffer_bytes: Option<usize>,
62}
63
64impl UdpSinkConfig {
65    pub const fn from_address(address: String) -> Self {
66        Self {
67            address,
68            send_buffer_bytes: None,
69        }
70    }
71
72    fn build_connector(&self) -> crate::Result<UdpConnector> {
73        let uri = self.address.parse::<http::Uri>()?;
74        let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
75        let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
76        Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
77    }
78
79    pub fn build(
80        &self,
81        transformer: Transformer,
82        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
83            + Clone
84            + Send
85            + Sync
86            + 'static,
87    ) -> crate::Result<(VectorSink, Healthcheck)> {
88        let connector = self.build_connector()?;
89        let sink = UdpSink::new(connector.clone(), transformer, encoder);
90        Ok((
91            VectorSink::from_event_streamsink(sink),
92            async move { connector.healthcheck().await }.boxed(),
93        ))
94    }
95}
96
97#[derive(Clone)]
98struct UdpConnector {
99    host: String,
100    port: u16,
101    send_buffer_bytes: Option<usize>,
102}
103
104impl UdpConnector {
105    const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
106        Self {
107            host,
108            port,
109            send_buffer_bytes,
110        }
111    }
112
113    const fn fresh_backoff() -> ExponentialBackoff {
114        // TODO: make configurable
115        ExponentialBackoff::from_millis(2)
116            .factor(250)
117            .max_delay(Duration::from_secs(60))
118    }
119
120    async fn connect(&self) -> Result<UdpSocket, UdpError> {
121        let ip = dns::Resolver
122            .lookup_ip(self.host.clone())
123            .await
124            .context(DnsSnafu)?
125            .next()
126            .ok_or(UdpError::NoAddresses)?;
127
128        let addr = SocketAddr::new(ip, self.port);
129        let bind_address = find_bind_address(&addr);
130
131        let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?;
132
133        if let Some(send_buffer_bytes) = self.send_buffer_bytes {
134            if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes) {
135                warn!(message = "Failed configuring send buffer size on UDP socket.", %error);
136            }
137        }
138
139        socket.connect(addr).await.context(ConnectSnafu)?;
140
141        Ok(socket)
142    }
143
144    async fn connect_backoff(&self) -> UdpSocket {
145        let mut backoff = Self::fresh_backoff();
146        loop {
147            match self.connect().await {
148                Ok(socket) => {
149                    emit!(UdpSocketConnectionEstablished {});
150                    return socket;
151                }
152                Err(error) => {
153                    emit!(UdpSocketOutgoingConnectionError { error });
154                    sleep(backoff.next().unwrap()).await;
155                }
156            }
157        }
158    }
159
160    async fn healthcheck(&self) -> crate::Result<()> {
161        self.connect().await.map(|_| ()).map_err(Into::into)
162    }
163}
164
165struct UdpSink<E>
166where
167    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
168{
169    connector: UdpConnector,
170    transformer: Transformer,
171    encoder: E,
172    bytes_sent: Registered<BytesSent>,
173}
174
175impl<E> UdpSink<E>
176where
177    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
178{
179    fn new(connector: UdpConnector, transformer: Transformer, encoder: E) -> Self {
180        Self {
181            connector,
182            transformer,
183            encoder,
184            bytes_sent: register!(BytesSent::from(Protocol::UDP)),
185        }
186    }
187}
188
189#[async_trait]
190impl<E> StreamSink<Event> for UdpSink<E>
191where
192    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
193{
194    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
195        let mut input = input.peekable();
196
197        let mut encoder = self.encoder.clone();
198        while Pin::new(&mut input).peek().await.is_some() {
199            let socket = self.connector.connect_backoff().await;
200            send_datagrams(
201                &mut input,
202                DatagramSocket::Udp(socket),
203                &self.transformer,
204                &mut encoder,
205                &self.bytes_sent,
206            )
207            .await;
208        }
209
210        Ok(())
211    }
212}
213
214pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
215    match remote_addr {
216        SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
217        SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
218    }
219}