vector/sinks/util/
datagram.rs

1use bytes::BytesMut;
2use futures::{StreamExt, stream::BoxStream};
3use futures_util::stream::Peekable;
4#[cfg(unix)]
5use std::path::PathBuf;
6use tokio::net::UdpSocket;
7#[cfg(unix)]
8use tokio::net::UnixDatagram;
9use tokio_util::codec::Encoder;
10use vector_lib::{
11    codecs::encoding::{Chunker, Chunking},
12    internal_event::{ByteSize, BytesSent, InternalEventHandle, RegisterInternalEvent},
13};
14
15#[cfg(unix)]
16use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};
17use crate::{
18    codecs::Transformer,
19    event::{Event, EventStatus, Finalizable},
20    internal_events::{
21        SocketEventsSent, SocketMode, SocketSendError, UdpChunkingError, UdpSendIncompleteError,
22    },
23};
24
25pub enum DatagramSocket {
26    Udp(UdpSocket),
27    #[cfg(unix)]
28    Unix(UnixDatagram, PathBuf),
29}
30
31pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
32    input: &mut Peekable<BoxStream<'_, Event>>,
33    mut socket: DatagramSocket,
34    transformer: &Transformer,
35    encoder: &mut E,
36    chunker: &Option<Chunker>,
37    bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
38) {
39    while let Some(mut event) = input.next().await {
40        transformer.transform(&mut event);
41        let finalizers = event.take_finalizers();
42        let mut bytes = BytesMut::new();
43
44        // Errors are handled by `Encoder`.
45        if encoder.encode(event, &mut bytes).is_err() {
46            finalizers.update_status(EventStatus::Errored);
47            continue;
48        }
49
50        let delivered = if let Some(chunker) = chunker {
51            let data_size = bytes.len();
52            match chunker.chunk(bytes.freeze()) {
53                Ok(chunks) => {
54                    let mut chunks_delivered = true;
55                    for bytes in chunks {
56                        if !send_and_emit(&mut socket, &bytes, bytes_sent).await {
57                            chunks_delivered = false;
58                            break;
59                        }
60                    }
61                    chunks_delivered
62                }
63                Err(err) => {
64                    emit!(UdpChunkingError {
65                        data_size,
66                        error: err
67                    });
68                    false
69                }
70            }
71        } else {
72            send_and_emit(&mut socket, &bytes.freeze(), bytes_sent).await
73        };
74
75        if delivered {
76            finalizers.update_status(EventStatus::Delivered);
77        } else {
78            finalizers.update_status(EventStatus::Errored);
79        }
80    }
81}
82
83async fn send_and_emit(
84    socket: &mut DatagramSocket,
85    bytes: &bytes::Bytes,
86    bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
87) -> bool {
88    match send_datagram(socket, bytes).await {
89        Ok(()) => {
90            emit!(SocketEventsSent {
91                mode: match socket {
92                    DatagramSocket::Udp(_) => SocketMode::Udp,
93                    #[cfg(unix)]
94                    DatagramSocket::Unix(..) => SocketMode::Unix,
95                },
96                count: 1,
97                byte_size: bytes.len().into(),
98            });
99            bytes_sent.emit(ByteSize(bytes.len()));
100            true
101        }
102        Err(error) => {
103            match socket {
104                DatagramSocket::Udp(_) => emit!(SocketSendError {
105                    mode: SocketMode::Udp,
106                    error
107                }),
108                #[cfg(unix)]
109                DatagramSocket::Unix(_, path) => {
110                    emit!(UnixSocketSendError {
111                        path: path.as_path(),
112                        error: &error
113                    })
114                }
115            };
116            false
117        }
118    }
119}
120
121async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
122    let sent = match socket {
123        DatagramSocket::Udp(udp) => udp.send(buf).await,
124        #[cfg(unix)]
125        DatagramSocket::Unix(uds, _) => uds.send(buf).await,
126    }?;
127    if sent != buf.len() {
128        match socket {
129            DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
130                data_size: buf.len(),
131                sent,
132            }),
133            #[cfg(unix)]
134            DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
135                data_size: buf.len(),
136                sent,
137            }),
138        }
139    }
140    Ok(())
141}