vector/sinks/util/
datagram.rs

1#[cfg(unix)]
2use std::path::PathBuf;
3
4use bytes::BytesMut;
5use futures::{stream::BoxStream, StreamExt};
6use futures_util::stream::Peekable;
7use tokio::net::UdpSocket;
8#[cfg(unix)]
9use tokio::net::UnixDatagram;
10use tokio_util::codec::Encoder;
11use vector_lib::internal_event::RegisterInternalEvent;
12use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle};
13use vector_lib::EstimatedJsonEncodedSizeOf;
14
15use crate::{
16    codecs::Transformer,
17    event::{Event, EventStatus, Finalizable},
18    internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError},
19};
20
21#[cfg(unix)]
22use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};
23
24pub enum DatagramSocket {
25    Udp(UdpSocket),
26    #[cfg(unix)]
27    Unix(UnixDatagram, PathBuf),
28}
29
30pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
31    input: &mut Peekable<BoxStream<'_, Event>>,
32    mut socket: DatagramSocket,
33    transformer: &Transformer,
34    encoder: &mut E,
35    bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
36) {
37    while let Some(mut event) = input.next().await {
38        let byte_size = event.estimated_json_encoded_size_of();
39
40        transformer.transform(&mut event);
41
42        let finalizers = event.take_finalizers();
43        let mut bytes = BytesMut::new();
44
45        // Errors are handled by `Encoder`.
46        if encoder.encode(event, &mut bytes).is_err() {
47            continue;
48        }
49
50        match send_datagram(&mut socket, &bytes).await {
51            Ok(()) => {
52                emit!(SocketEventsSent {
53                    mode: match socket {
54                        DatagramSocket::Udp(_) => SocketMode::Udp,
55                        #[cfg(unix)]
56                        DatagramSocket::Unix(..) => SocketMode::Unix,
57                    },
58                    count: 1,
59                    byte_size,
60                });
61
62                bytes_sent.emit(ByteSize(bytes.len()));
63                finalizers.update_status(EventStatus::Delivered);
64            }
65            Err(error) => {
66                match socket {
67                    DatagramSocket::Udp(_) => emit!(SocketSendError {
68                        mode: SocketMode::Udp,
69                        error
70                    }),
71                    #[cfg(unix)]
72                    DatagramSocket::Unix(_, path) => {
73                        emit!(UnixSocketSendError {
74                            path: path.as_path(),
75                            error: &error
76                        })
77                    }
78                };
79                finalizers.update_status(EventStatus::Errored);
80                return;
81            }
82        }
83    }
84}
85
86async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
87    let sent = match socket {
88        DatagramSocket::Udp(udp) => udp.send(buf).await,
89        #[cfg(unix)]
90        DatagramSocket::Unix(uds, _) => uds.send(buf).await,
91    }?;
92    if sent != buf.len() {
93        match socket {
94            DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
95                data_size: buf.len(),
96                sent,
97            }),
98            #[cfg(unix)]
99            DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
100                data_size: buf.len(),
101                sent,
102            }),
103        }
104    }
105    Ok(())
106}