vector/sinks/util/
datagram.rs1use bytes::BytesMut;
2use futures::{StreamExt, stream::BoxStream};
3use futures_util::stream::Peekable;
4#[cfg(unix)]
5use std::path::PathBuf;
6use tokio::net::UdpSocket;
7#[cfg(unix)]
8use tokio::net::UnixDatagram;
9use tokio_util::codec::Encoder;
10use vector_lib::{
11 codecs::encoding::{Chunker, Chunking},
12 internal_event::{ByteSize, BytesSent, InternalEventHandle, RegisterInternalEvent},
13};
14
15#[cfg(unix)]
16use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};
17use crate::{
18 codecs::Transformer,
19 event::{Event, EventStatus, Finalizable},
20 internal_events::{
21 SocketEventsSent, SocketMode, SocketSendError, UdpChunkingError, UdpSendIncompleteError,
22 },
23};
24
25pub enum DatagramSocket {
26 Udp(UdpSocket),
27 #[cfg(unix)]
28 Unix(UnixDatagram, PathBuf),
29}
30
31pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
32 input: &mut Peekable<BoxStream<'_, Event>>,
33 mut socket: DatagramSocket,
34 transformer: &Transformer,
35 encoder: &mut E,
36 chunker: &Option<Chunker>,
37 bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
38) {
39 while let Some(mut event) = input.next().await {
40 transformer.transform(&mut event);
41 let finalizers = event.take_finalizers();
42 let mut bytes = BytesMut::new();
43
44 if encoder.encode(event, &mut bytes).is_err() {
46 finalizers.update_status(EventStatus::Errored);
47 continue;
48 }
49
50 let delivered = if let Some(chunker) = chunker {
51 let data_size = bytes.len();
52 match chunker.chunk(bytes.freeze()) {
53 Ok(chunks) => {
54 let mut chunks_delivered = true;
55 for bytes in chunks {
56 if !send_and_emit(&mut socket, &bytes, bytes_sent).await {
57 chunks_delivered = false;
58 break;
59 }
60 }
61 chunks_delivered
62 }
63 Err(err) => {
64 emit!(UdpChunkingError {
65 data_size,
66 error: err
67 });
68 false
69 }
70 }
71 } else {
72 send_and_emit(&mut socket, &bytes.freeze(), bytes_sent).await
73 };
74
75 if delivered {
76 finalizers.update_status(EventStatus::Delivered);
77 } else {
78 finalizers.update_status(EventStatus::Errored);
79 }
80 }
81}
82
83async fn send_and_emit(
84 socket: &mut DatagramSocket,
85 bytes: &bytes::Bytes,
86 bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
87) -> bool {
88 match send_datagram(socket, bytes).await {
89 Ok(()) => {
90 emit!(SocketEventsSent {
91 mode: match socket {
92 DatagramSocket::Udp(_) => SocketMode::Udp,
93 #[cfg(unix)]
94 DatagramSocket::Unix(..) => SocketMode::Unix,
95 },
96 count: 1,
97 byte_size: bytes.len().into(),
98 });
99 bytes_sent.emit(ByteSize(bytes.len()));
100 true
101 }
102 Err(error) => {
103 match socket {
104 DatagramSocket::Udp(_) => emit!(SocketSendError {
105 mode: SocketMode::Udp,
106 error
107 }),
108 #[cfg(unix)]
109 DatagramSocket::Unix(_, path) => {
110 emit!(UnixSocketSendError {
111 path: path.as_path(),
112 error: &error
113 })
114 }
115 };
116 false
117 }
118 }
119}
120
121async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
122 let sent = match socket {
123 DatagramSocket::Udp(udp) => udp.send(buf).await,
124 #[cfg(unix)]
125 DatagramSocket::Unix(uds, _) => uds.send(buf).await,
126 }?;
127 if sent != buf.len() {
128 match socket {
129 DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
130 data_size: buf.len(),
131 sent,
132 }),
133 #[cfg(unix)]
134 DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
135 data_size: buf.len(),
136 sent,
137 }),
138 }
139 }
140 Ok(())
141}