vector/sinks/util/
udp.rs

1use std::{
2    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
3    pin::Pin,
4};
5
6use async_trait::async_trait;
7use futures::{FutureExt, StreamExt, stream::BoxStream};
8use snafu::{ResultExt, Snafu};
9use tokio::{net::UdpSocket, time::sleep};
10use tokio_util::codec::Encoder;
11use vector_lib::{
12    codecs::encoding::Chunker,
13    configurable::configurable_component,
14    internal_event::{BytesSent, Protocol, Registered},
15};
16
17use super::{
18    SinkBuildError,
19    datagram::{DatagramSocket, send_datagrams},
20};
21use crate::{
22    codecs::Transformer,
23    common::backoff::ExponentialBackoff,
24    dns,
25    event::Event,
26    internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
27    net,
28    sinks::{Healthcheck, VectorSink, util::StreamSink},
29};
30
31#[derive(Debug, Snafu)]
32pub enum UdpError {
33    #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))]
34    BindError { source: std::io::Error },
35    #[snafu(display("Connect error: {}", source))]
36    ConnectError { source: std::io::Error },
37    #[snafu(display("No addresses returned."))]
38    NoAddresses,
39    #[snafu(display("Unable to resolve DNS: {}", source))]
40    DnsError { source: crate::dns::DnsError },
41}
42
43/// A UDP sink.
44#[configurable_component]
45#[derive(Clone, Debug)]
46pub struct UdpSinkConfig {
47    /// The address to connect to.
48    ///
49    /// Both IP address and hostname are accepted formats.
50    ///
51    /// The address _must_ include a port.
52    #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
53    #[configurable(metadata(docs::examples = "https://somehost:5000"))]
54    address: String,
55
56    /// The size of the socket's send buffer.
57    ///
58    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
59    #[configurable(metadata(docs::type_unit = "bytes"))]
60    #[configurable(metadata(docs::examples = 65536))]
61    send_buffer_bytes: Option<usize>,
62}
63
64impl UdpSinkConfig {
65    pub const fn from_address(address: String) -> Self {
66        Self {
67            address,
68            send_buffer_bytes: None,
69        }
70    }
71
72    fn build_connector(&self) -> crate::Result<UdpConnector> {
73        let uri = self.address.parse::<http::Uri>()?;
74        let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
75        let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
76        Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
77    }
78
79    pub fn build(
80        &self,
81        transformer: Transformer,
82        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
83        + Clone
84        + Send
85        + Sync
86        + 'static,
87        chunker: Option<Chunker>,
88    ) -> crate::Result<(VectorSink, Healthcheck)> {
89        let connector = self.build_connector()?;
90        let sink = UdpSink::new(connector.clone(), transformer, encoder, chunker);
91        Ok((
92            VectorSink::from_event_streamsink(sink),
93            async move { connector.healthcheck().await }.boxed(),
94        ))
95    }
96}
97
98#[derive(Clone)]
99struct UdpConnector {
100    host: String,
101    port: u16,
102    send_buffer_bytes: Option<usize>,
103}
104
105impl UdpConnector {
106    const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
107        Self {
108            host,
109            port,
110            send_buffer_bytes,
111        }
112    }
113
114    fn fresh_backoff() -> ExponentialBackoff {
115        // TODO: make configurable
116        ExponentialBackoff::default()
117    }
118
119    async fn connect(&self) -> Result<UdpSocket, UdpError> {
120        let ip = dns::Resolver
121            .lookup_ip(self.host.clone())
122            .await
123            .context(DnsSnafu)?
124            .next()
125            .ok_or(UdpError::NoAddresses)?;
126
127        let addr = SocketAddr::new(ip, self.port);
128        let bind_address = find_bind_address(&addr);
129
130        let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?;
131
132        if let Some(send_buffer_bytes) = self.send_buffer_bytes
133            && let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes)
134        {
135            warn!(message = "Failed configuring send buffer size on UDP socket.", %error);
136        }
137
138        socket.connect(addr).await.context(ConnectSnafu)?;
139
140        Ok(socket)
141    }
142
143    async fn connect_backoff(&self) -> UdpSocket {
144        let mut backoff = Self::fresh_backoff();
145        loop {
146            match self.connect().await {
147                Ok(socket) => {
148                    emit!(UdpSocketConnectionEstablished {});
149                    return socket;
150                }
151                Err(error) => {
152                    emit!(UdpSocketOutgoingConnectionError { error });
153                    sleep(backoff.next().unwrap()).await;
154                }
155            }
156        }
157    }
158
159    async fn healthcheck(&self) -> crate::Result<()> {
160        self.connect().await.map(|_| ()).map_err(Into::into)
161    }
162}
163
164struct UdpSink<E>
165where
166    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
167{
168    connector: UdpConnector,
169    transformer: Transformer,
170    encoder: E,
171    chunker: Option<Chunker>,
172    bytes_sent: Registered<BytesSent>,
173}
174
175impl<E> UdpSink<E>
176where
177    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
178{
179    fn new(
180        connector: UdpConnector,
181        transformer: Transformer,
182        encoder: E,
183        chunker: Option<Chunker>,
184    ) -> Self {
185        Self {
186            connector,
187            transformer,
188            encoder,
189            chunker,
190            bytes_sent: register!(BytesSent::from(Protocol::UDP)),
191        }
192    }
193}
194
195#[async_trait]
196impl<E> StreamSink<Event> for UdpSink<E>
197where
198    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
199{
200    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
201        let mut input = input.peekable();
202
203        let mut encoder = self.encoder.clone();
204        let chunker = self.chunker.clone();
205        while Pin::new(&mut input).peek().await.is_some() {
206            let socket = self.connector.connect_backoff().await;
207            send_datagrams(
208                &mut input,
209                DatagramSocket::Udp(socket),
210                &self.transformer,
211                &mut encoder,
212                &chunker,
213                &self.bytes_sent,
214            )
215            .await;
216        }
217
218        Ok(())
219    }
220}
221
222pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
223    match remote_addr {
224        SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
225        SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
226    }
227}