vector/sinks/util/
udp.rs

1use std::{
2    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::{stream::BoxStream, FutureExt, StreamExt};
9use snafu::{ResultExt, Snafu};
10use tokio::{net::UdpSocket, time::sleep};
11use tokio_util::codec::Encoder;
12use vector_lib::configurable::configurable_component;
13use vector_lib::internal_event::{BytesSent, Protocol, Registered};
14
15use super::{
16    datagram::{send_datagrams, DatagramSocket},
17    SinkBuildError,
18};
19use crate::{
20    codecs::Transformer,
21    common::backoff::ExponentialBackoff,
22    dns,
23    event::Event,
24    internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
25    net,
26    sinks::{util::StreamSink, Healthcheck, VectorSink},
27};
28
29#[derive(Debug, Snafu)]
30pub enum UdpError {
31    #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))]
32    BindError { source: std::io::Error },
33    #[snafu(display("Connect error: {}", source))]
34    ConnectError { source: std::io::Error },
35    #[snafu(display("No addresses returned."))]
36    NoAddresses,
37    #[snafu(display("Unable to resolve DNS: {}", source))]
38    DnsError { source: crate::dns::DnsError },
39}
40
41/// A UDP sink.
42#[configurable_component]
43#[derive(Clone, Debug)]
44pub struct UdpSinkConfig {
45    /// The address to connect to.
46    ///
47    /// Both IP address and hostname are accepted formats.
48    ///
49    /// The address _must_ include a port.
50    #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
51    #[configurable(metadata(docs::examples = "https://somehost:5000"))]
52    address: String,
53
54    /// The size of the socket's send buffer.
55    ///
56    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
57    #[configurable(metadata(docs::type_unit = "bytes"))]
58    #[configurable(metadata(docs::examples = 65536))]
59    send_buffer_bytes: Option<usize>,
60}
61
62impl UdpSinkConfig {
63    pub const fn from_address(address: String) -> Self {
64        Self {
65            address,
66            send_buffer_bytes: None,
67        }
68    }
69
70    fn build_connector(&self) -> crate::Result<UdpConnector> {
71        let uri = self.address.parse::<http::Uri>()?;
72        let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
73        let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
74        Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
75    }
76
77    pub fn build(
78        &self,
79        transformer: Transformer,
80        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
81            + Clone
82            + Send
83            + Sync
84            + 'static,
85    ) -> crate::Result<(VectorSink, Healthcheck)> {
86        let connector = self.build_connector()?;
87        let sink = UdpSink::new(connector.clone(), transformer, encoder);
88        Ok((
89            VectorSink::from_event_streamsink(sink),
90            async move { connector.healthcheck().await }.boxed(),
91        ))
92    }
93}
94
95#[derive(Clone)]
96struct UdpConnector {
97    host: String,
98    port: u16,
99    send_buffer_bytes: Option<usize>,
100}
101
102impl UdpConnector {
103    const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
104        Self {
105            host,
106            port,
107            send_buffer_bytes,
108        }
109    }
110
111    const fn fresh_backoff() -> ExponentialBackoff {
112        // TODO: make configurable
113        ExponentialBackoff::from_millis(2)
114            .factor(250)
115            .max_delay(Duration::from_secs(60))
116    }
117
118    async fn connect(&self) -> Result<UdpSocket, UdpError> {
119        let ip = dns::Resolver
120            .lookup_ip(self.host.clone())
121            .await
122            .context(DnsSnafu)?
123            .next()
124            .ok_or(UdpError::NoAddresses)?;
125
126        let addr = SocketAddr::new(ip, self.port);
127        let bind_address = find_bind_address(&addr);
128
129        let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?;
130
131        if let Some(send_buffer_bytes) = self.send_buffer_bytes {
132            if let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes) {
133                warn!(message = "Failed configuring send buffer size on UDP socket.", %error);
134            }
135        }
136
137        socket.connect(addr).await.context(ConnectSnafu)?;
138
139        Ok(socket)
140    }
141
142    async fn connect_backoff(&self) -> UdpSocket {
143        let mut backoff = Self::fresh_backoff();
144        loop {
145            match self.connect().await {
146                Ok(socket) => {
147                    emit!(UdpSocketConnectionEstablished {});
148                    return socket;
149                }
150                Err(error) => {
151                    emit!(UdpSocketOutgoingConnectionError { error });
152                    sleep(backoff.next().unwrap()).await;
153                }
154            }
155        }
156    }
157
158    async fn healthcheck(&self) -> crate::Result<()> {
159        self.connect().await.map(|_| ()).map_err(Into::into)
160    }
161}
162
163struct UdpSink<E>
164where
165    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
166{
167    connector: UdpConnector,
168    transformer: Transformer,
169    encoder: E,
170    bytes_sent: Registered<BytesSent>,
171}
172
173impl<E> UdpSink<E>
174where
175    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
176{
177    fn new(connector: UdpConnector, transformer: Transformer, encoder: E) -> Self {
178        Self {
179            connector,
180            transformer,
181            encoder,
182            bytes_sent: register!(BytesSent::from(Protocol::UDP)),
183        }
184    }
185}
186
187#[async_trait]
188impl<E> StreamSink<Event> for UdpSink<E>
189where
190    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
191{
192    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
193        let mut input = input.peekable();
194
195        let mut encoder = self.encoder.clone();
196        while Pin::new(&mut input).peek().await.is_some() {
197            let socket = self.connector.connect_backoff().await;
198            send_datagrams(
199                &mut input,
200                DatagramSocket::Udp(socket),
201                &self.transformer,
202                &mut encoder,
203                &self.bytes_sent,
204            )
205            .await;
206        }
207
208        Ok(())
209    }
210}
211
212pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
213    match remote_addr {
214        SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
215        SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
216    }
217}