vector/sinks/util/
udp.rs

1use std::{
2    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::{FutureExt, StreamExt, stream::BoxStream};
9use snafu::{ResultExt, Snafu};
10use tokio::{net::UdpSocket, time::sleep};
11use tokio_util::codec::Encoder;
12use vector_lib::{
13    codecs::encoding::Chunker,
14    configurable::configurable_component,
15    internal_event::{BytesSent, Protocol, Registered},
16};
17
18use super::{
19    SinkBuildError,
20    datagram::{DatagramSocket, send_datagrams},
21};
22use crate::{
23    codecs::Transformer,
24    common::backoff::ExponentialBackoff,
25    dns,
26    event::Event,
27    internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError},
28    net,
29    sinks::{Healthcheck, VectorSink, util::StreamSink},
30};
31
32#[derive(Debug, Snafu)]
33pub enum UdpError {
34    #[snafu(display("Failed to create UDP listener socket, error = {:?}.", source))]
35    BindError { source: std::io::Error },
36    #[snafu(display("Connect error: {}", source))]
37    ConnectError { source: std::io::Error },
38    #[snafu(display("No addresses returned."))]
39    NoAddresses,
40    #[snafu(display("Unable to resolve DNS: {}", source))]
41    DnsError { source: crate::dns::DnsError },
42}
43
44/// A UDP sink.
45#[configurable_component]
46#[derive(Clone, Debug)]
47pub struct UdpSinkConfig {
48    /// The address to connect to.
49    ///
50    /// Both IP address and hostname are accepted formats.
51    ///
52    /// The address _must_ include a port.
53    #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
54    #[configurable(metadata(docs::examples = "https://somehost:5000"))]
55    address: String,
56
57    /// The size of the socket's send buffer.
58    ///
59    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
60    #[configurable(metadata(docs::type_unit = "bytes"))]
61    #[configurable(metadata(docs::examples = 65536))]
62    send_buffer_bytes: Option<usize>,
63}
64
65impl UdpSinkConfig {
66    pub const fn from_address(address: String) -> Self {
67        Self {
68            address,
69            send_buffer_bytes: None,
70        }
71    }
72
73    fn build_connector(&self) -> crate::Result<UdpConnector> {
74        let uri = self.address.parse::<http::Uri>()?;
75        let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
76        let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
77        Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
78    }
79
80    pub fn build(
81        &self,
82        transformer: Transformer,
83        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
84        + Clone
85        + Send
86        + Sync
87        + 'static,
88        chunker: Option<Chunker>,
89    ) -> crate::Result<(VectorSink, Healthcheck)> {
90        let connector = self.build_connector()?;
91        let sink = UdpSink::new(connector.clone(), transformer, encoder, chunker);
92        Ok((
93            VectorSink::from_event_streamsink(sink),
94            async move { connector.healthcheck().await }.boxed(),
95        ))
96    }
97}
98
99#[derive(Clone)]
100struct UdpConnector {
101    host: String,
102    port: u16,
103    send_buffer_bytes: Option<usize>,
104}
105
106impl UdpConnector {
107    const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
108        Self {
109            host,
110            port,
111            send_buffer_bytes,
112        }
113    }
114
115    const fn fresh_backoff() -> ExponentialBackoff {
116        // TODO: make configurable
117        ExponentialBackoff::from_millis(2)
118            .factor(250)
119            .max_delay(Duration::from_secs(60))
120    }
121
122    async fn connect(&self) -> Result<UdpSocket, UdpError> {
123        let ip = dns::Resolver
124            .lookup_ip(self.host.clone())
125            .await
126            .context(DnsSnafu)?
127            .next()
128            .ok_or(UdpError::NoAddresses)?;
129
130        let addr = SocketAddr::new(ip, self.port);
131        let bind_address = find_bind_address(&addr);
132
133        let socket = UdpSocket::bind(bind_address).await.context(BindSnafu)?;
134
135        if let Some(send_buffer_bytes) = self.send_buffer_bytes
136            && let Err(error) = net::set_send_buffer_size(&socket, send_buffer_bytes)
137        {
138            warn!(message = "Failed configuring send buffer size on UDP socket.", %error);
139        }
140
141        socket.connect(addr).await.context(ConnectSnafu)?;
142
143        Ok(socket)
144    }
145
146    async fn connect_backoff(&self) -> UdpSocket {
147        let mut backoff = Self::fresh_backoff();
148        loop {
149            match self.connect().await {
150                Ok(socket) => {
151                    emit!(UdpSocketConnectionEstablished {});
152                    return socket;
153                }
154                Err(error) => {
155                    emit!(UdpSocketOutgoingConnectionError { error });
156                    sleep(backoff.next().unwrap()).await;
157                }
158            }
159        }
160    }
161
162    async fn healthcheck(&self) -> crate::Result<()> {
163        self.connect().await.map(|_| ()).map_err(Into::into)
164    }
165}
166
167struct UdpSink<E>
168where
169    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
170{
171    connector: UdpConnector,
172    transformer: Transformer,
173    encoder: E,
174    chunker: Option<Chunker>,
175    bytes_sent: Registered<BytesSent>,
176}
177
178impl<E> UdpSink<E>
179where
180    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
181{
182    fn new(
183        connector: UdpConnector,
184        transformer: Transformer,
185        encoder: E,
186        chunker: Option<Chunker>,
187    ) -> Self {
188        Self {
189            connector,
190            transformer,
191            encoder,
192            chunker,
193            bytes_sent: register!(BytesSent::from(Protocol::UDP)),
194        }
195    }
196}
197
198#[async_trait]
199impl<E> StreamSink<Event> for UdpSink<E>
200where
201    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
202{
203    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
204        let mut input = input.peekable();
205
206        let mut encoder = self.encoder.clone();
207        let chunker = self.chunker.clone();
208        while Pin::new(&mut input).peek().await.is_some() {
209            let socket = self.connector.connect_backoff().await;
210            send_datagrams(
211                &mut input,
212                DatagramSocket::Udp(socket),
213                &self.transformer,
214                &mut encoder,
215                &chunker,
216                &self.bytes_sent,
217            )
218            .await;
219        }
220
221        Ok(())
222    }
223}
224
225pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
226    match remote_addr {
227        SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
228        SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
229    }
230}